Thomas Vachuska
Committed by Gerrit Code Review

Removing hazelcat dependency throughout.

Change-Id: I738050fda142418d2956f613035892dac82ef098
Showing 30 changed files with 34 additions and 2193 deletions
......@@ -71,7 +71,7 @@
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.7</version>
<version>1.0.7</version>
</dependency>
<dependency>
......@@ -93,31 +93,21 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<!-- for shaded copycat -->
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-thirdparty</artifactId>
<groupId>org.onosproject</groupId>
<artifactId>onlab-thirdparty</artifactId>
</dependency>
</dependencies>
......
package org.onosproject.store.cluster.impl;
import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
......@@ -31,7 +30,6 @@ import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
/**
* Implementation of ClusterDefinitionService.
......@@ -115,7 +113,7 @@ public class ClusterDefinitionManager implements ClusterDefinitionService {
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
if (matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
return ip;
}
}
......@@ -169,4 +167,11 @@ public class ClusterDefinitionManager implements ClusterDefinitionService {
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
// Indicates whether the specified interface address matches the given prefix.
// FIXME: Add a facility to IpPrefix to make this more robust
private static boolean matchInterface(String ip, String ipPrefix) {
String s = ipPrefix.replaceAll("\\.\\*", "");
return ip.startsWith(s);
}
}
......
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.OptionalCacheLoader;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
import static org.onosproject.cluster.ControllerNode.State;
/**
* Distributed, Hazelcast-based implementation of the cluster nodes store.
*/
@Component(immediate = true, enabled = false)
@Service
public class HazelcastClusterStore
extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private IMap<byte[], byte[]> rawNodes;
private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
private String listenerId;
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Map<NodeId, DateTime> lastUpdatedTimes = Maps.newConcurrentMap();
private String nodesListenerId;
@Override
@Activate
public void activate() {
super.activate();
listenerId = theInstance.getCluster().addMembershipListener(listener);
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(serializer, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
loadClusterNodes();
log.info("Started");
}
// Loads the initial set of cluster nodes
private void loadClusterNodes() {
for (Member member : theInstance.getCluster().getMembers()) {
addNode(node(member));
}
}
@Deactivate
public void deactivate() {
rawNodes.removeEntryListener(nodesListenerId);
theInstance.getCluster().removeMembershipListener(listenerId);
log.info("Stopped");
}
@Override
public ControllerNode getLocalNode() {
return node(theInstance.getCluster().getLocalMember());
}
@Override
public Set<ControllerNode> getNodes() {
ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
builder.add(optional.get());
}
return builder.build();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return nodes.getUnchecked(nodeId).orNull();
}
@Override
public State getState(NodeId nodeId) {
State state = states.get(nodeId);
return state == null ? State.INACTIVE : state;
}
@Override
public DateTime getLastUpdated(NodeId nodeId) {
return lastUpdatedTimes.get(nodeId);
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
@Override
public void removeNode(NodeId nodeId) {
synchronized (this) {
rawNodes.remove(serialize(nodeId));
nodes.invalidate(nodeId);
}
}
// Adds a new node based on the specified member
private synchronized ControllerNode addNode(DefaultControllerNode node) {
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
updateState(node.id(), State.ACTIVE);
return node;
}
// Creates a controller node descriptor from the Hazelcast member.
private DefaultControllerNode node(Member member) {
IpAddress ip = memberAddress(member);
return new DefaultControllerNode(new NodeId(ip.toString()), ip);
}
private IpAddress memberAddress(Member member) {
return IpAddress.valueOf(member.getSocketAddress().getAddress());
}
private void updateState(NodeId nodeId, State newState) {
updateState(nodeId, newState);
lastUpdatedTimes.put(nodeId, DateTime.now());
}
// Interceptor for membership events.
private class InternalMembershipListener implements MembershipListener {
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("Member {} added", membershipEvent.getMember());
ControllerNode node = addNode(node(membershipEvent.getMember()));
notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
log.info("Member {} removed", membershipEvent.getMember());
NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
updateState(nodeId, State.INACTIVE);
notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
}
@Override
public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
log.info("Member {} attribute {} changed to {}",
memberAttributeEvent.getMember(),
memberAttributeEvent.getKey(),
memberAttributeEvent.getValue());
}
}
}
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.core.impl;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.putIfAbsent;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.MapEvent;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.ApplicationIdStore;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.SMap;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Simple implementation of the application ID registry using in-memory
* structures.
*/
@Component(immediate = false, enabled = false)
@Service
public class DistributedApplicationIdStore
extends AbstractHazelcastStore<AppIdEvent, AppIdStoreDelegate>
implements ApplicationIdStore {
protected IAtomicLong lastAppId;
protected SMap<String, DefaultApplicationId> appIdsByName;
protected Map<Short, DefaultApplicationId> appIds = new ConcurrentHashMap<>();
private String listenerId;
@Override
@Activate
public void activate() {
super.activate();
this.serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
lastAppId = theInstance.getAtomicLong("applicationId");
appIdsByName = new SMap<>(theInstance.<byte[], byte[]>getMap("appIdsByName"), this.serializer);
listenerId = appIdsByName.addEntryListener((new RemoteAppIdEventHandler()), true);
primeAppIds();
log.info("Started");
}
@Deactivate
public void deactivate() {
appIdsByName.removeEntryListener(listenerId);
log.info("Stopped");
}
@Override
public Set<ApplicationId> getAppIds() {
return ImmutableSet.<ApplicationId>copyOf(appIds.values());
}
@Override
public ApplicationId getAppId(Short id) {
ApplicationId appId = appIds.get(id);
if (appId == null) {
primeAppIds();
return appIds.get(id);
}
return appId;
}
@Override
public ApplicationId getAppId(String name) {
return appIdsByName.get(name);
}
private void primeAppIds() {
for (DefaultApplicationId appId : appIdsByName.values()) {
appIds.putIfAbsent(appId.id(), appId);
}
}
@Override
public ApplicationId registerApplication(String name) {
DefaultApplicationId appId = appIdsByName.get(name);
if (appId == null) {
int id = (int) lastAppId.getAndIncrement();
appId = putIfAbsent(appIdsByName, name,
new DefaultApplicationId(id, name));
}
return appId;
}
private class RemoteAppIdEventHandler implements EntryListener<String, DefaultApplicationId> {
@Override
public void entryAdded(EntryEvent<String, DefaultApplicationId> event) {
DefaultApplicationId appId = event.getValue();
appIds.put(appId.id(), appId);
}
@Override
public void entryRemoved(EntryEvent<String, DefaultApplicationId> event) {
}
@Override
public void entryUpdated(EntryEvent<String, DefaultApplicationId> event) {
entryAdded(event);
}
@Override
public void entryEvicted(EntryEvent<String, DefaultApplicationId> event) {
}
@Override
public void mapEvicted(MapEvent event) {
}
@Override
public void mapCleared(MapEvent event) {
}
}
}
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.core.impl;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.core.IdBlock;
import org.onosproject.core.IdBlockStore;
import org.onosproject.store.hz.StoreService;
import java.util.Map;
/**
* Distributed implementation of id block store using Hazelcast.
*/
@Component(immediate = false, enabled = false)
@Service
public class DistributedIdBlockStore implements IdBlockStore {
private static final long DEFAULT_BLOCK_SIZE = 0x100000L;
protected Map<String, IAtomicLong> topicBlocks;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
protected HazelcastInstance theInstance;
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
}
@Override
public IdBlock getIdBlock(String topic) {
Long blockBase = theInstance.getAtomicLong(topic).getAndAdd(DEFAULT_BLOCK_SIZE);
return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import com.google.common.base.Optional;
import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
import com.google.common.cache.LoadingCache;
/**
* Wrapper around LoadingCache to handle negative hit scenario.
* <p>
* When the LoadingCache returned Absent,
* this implementation will invalidate the entry immediately to avoid
* caching negative hits.
*
* @param <K> Cache key type
* @param <V> Cache value type. (Optional{@literal <V>})
*/
public class AbsentInvalidatingLoadingCache<K, V> extends
SimpleForwardingLoadingCache<K, Optional<V>> {
/**
* Constructor.
*
* @param delegate actual {@link LoadingCache} to delegate loading.
*/
public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
super(delegate);
}
@Override
public Optional<V> get(K key) throws ExecutionException {
Optional<V> v = super.get(key);
if (!v.isPresent()) {
invalidate(key);
}
return v;
}
@Override
public Optional<V> getUnchecked(K key) {
Optional<V> v = super.getUnchecked(key);
if (!v.isPresent()) {
invalidate(key);
}
return v;
}
@Override
public Optional<V> apply(K key) {
return getUnchecked(key);
}
@Override
public Optional<V> getIfPresent(Object key) {
Optional<V> v = super.getIfPresent(key);
if (!v.isPresent()) {
invalidate(key);
}
return v;
}
@Override
public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
throws ExecutionException {
Optional<V> v = super.get(key, valueLoader);
if (!v.isPresent()) {
invalidate(key);
}
return v;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.event.Event;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.StoreDelegate;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Abstraction of a distributed store based on Hazelcast.
*/
@Component
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
protected final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
protected StoreSerializer serializer;
protected HazelcastInstance theInstance;
@Activate
public void activate() {
serializer = new KryoSerializer();
theInstance = storeService.getHazelcastInstance();
}
/**
* Serializes the specified object using the backing store service.
*
* @param obj object to be serialized
* @return serialized object
*/
protected byte[] serialize(Object obj) {
return serializer.encode(obj);
}
/**
* Deserializes the specified object using the backing store service.
*
* @param bytes bytes to be deserialized
* @param <T> type of object
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
return serializer.decode(bytes);
}
/**
* An IMap entry listener, which reflects each remote event to the cache.
*
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
private LoadingCache<K, Optional<V>> cache;
/**
* Constructor.
*
* @param cache cache to update
*/
public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
this.localMember = theInstance.getCluster().getLocalMember();
this.cache = checkNotNull(cache);
}
@Override
public void mapCleared(MapEvent event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
cache.invalidateAll();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().putIfAbsent(key, newValue);
onAdd(key, newVal);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().replace(key, oldValue, newValue);
onUpdate(key, oldVal, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getOldValue());
cache.invalidate(key);
onRemove(key, val);
}
/**
* Cache entry addition hook.
*
* @param key new key
* @param newVal new value
*/
protected void onAdd(K key, V newVal) {
}
/**
* Cache entry update hook.
*
* @param key new key
* @param oldValue old value
* @param newVal new value
*/
protected void onUpdate(K key, V oldValue, V newVal) {
}
/**
* Cache entry remove hook.
*
* @param key new key
* @param val old value
*/
protected void onRemove(K key, V val) {
}
}
/**
* Distributed object remote event entry listener.
*
* @param <K> Entry key type after deserialization
* @param <V> Entry value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
public RemoteEventHandler() {
this.localMember = theInstance.getCluster().getLocalMember();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
onAdd(key, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getValue());
onRemove(key, val);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
V newVal = deserialize(event.getValue());
onUpdate(key, oldVal, newVal);
}
/**
* Remote entry addition hook.
*
* @param key new key
* @param newVal new value
*/
protected void onAdd(K key, V newVal) {
}
/**
* Remote entry update hook.
*
* @param key new key
* @param oldValue old value
* @param newVal new value
*/
protected void onUpdate(K key, V oldValue, V newVal) {
}
/**
* Remote entry remove hook.
*
* @param key new key
* @param val old value
*/
protected void onRemove(K key, V val) {
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onosproject.store.serializers.StoreSerializer;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
import com.hazelcast.core.IMap;
/**
* CacheLoader to wrap Map value with Optional,
* to handle negative hit on underlying IMap.
*
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final StoreSerializer serializer;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param serializer to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = serializer.encode(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = serializer.decode(valBytes);
return Optional.of(dev);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import com.hazelcast.monitor.LocalQueueStats;
import org.onosproject.store.serializers.StoreSerializer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Wrapper around IQueue&lt;byte[]&gt; which serializes/deserializes
* key and value using StoreSerializer.
*
* @param <T> type
*/
public class SQueue<T> implements IQueue<T> {
private final IQueue<byte[]> q;
private final StoreSerializer serializer;
/**
* Creates a SQueue instance.
*
* @param baseQueue base IQueue to use
* @param serializer serializer to use for both key and value
*/
public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
this.q = checkNotNull(baseQueue);
this.serializer = checkNotNull(serializer);
}
private byte[] serialize(Object key) {
return serializer.encode(key);
}
private T deserialize(byte[] key) {
return serializer.decode(key);
}
@Override
public boolean add(T t) {
return q.add(serialize(t));
}
@Override
public boolean offer(T t) {
return q.offer(serialize(t));
}
@Override
public void put(T t) throws InterruptedException {
q.put(serialize(t));
}
@Override
public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
return q.offer(serialize(t), l, timeUnit);
}
@Override
public T take() throws InterruptedException {
return deserialize(q.take());
}
@Override
public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
return deserialize(q.poll(l, timeUnit));
}
@Override
public int remainingCapacity() {
return q.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return q.remove(serialize(o));
}
@Override
public boolean contains(Object o) {
return q.contains(serialize(o));
}
@Deprecated // not implemented yet
@Override
public int drainTo(Collection<? super T> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public int drainTo(Collection<? super T> collection, int i) {
throw new UnsupportedOperationException();
}
@Override
public T remove() {
return deserialize(q.remove());
}
@Override
public T poll() {
return deserialize(q.poll());
}
@Override
public T element() {
return deserialize(q.element());
}
@Override
public T peek() {
return deserialize(q.peek());
}
@Override
public int size() {
return q.size();
}
@Override
public boolean isEmpty() {
return q.isEmpty();
}
@Override
public Iterator<T> iterator() {
return FluentIterable.from(q)
.transform(new DeserializeVal())
.iterator();
}
@Deprecated // not implemented yet
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public <T1> T1[] toArray(T1[] t1s) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean containsAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean addAll(Collection<? extends T> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean removeAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Deprecated // not implemented yet
@Override
public boolean retainAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
q.clear();
}
@Override
public LocalQueueStats getLocalQueueStats() {
return q.getLocalQueueStats();
}
@Override
public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
ItemListener<byte[]> il = new ItemListener<byte[]>() {
@Override
public void itemAdded(ItemEvent<byte[]> item) {
itemListener.itemAdded(new ItemEvent<T>(getName(item),
item.getEventType(),
deserialize(item.getItem()),
item.getMember()));
}
@Override
public void itemRemoved(ItemEvent<byte[]> item) {
itemListener.itemRemoved(new ItemEvent<T>(getName(item),
item.getEventType(),
deserialize(item.getItem()),
item.getMember()));
}
private String getName(ItemEvent<byte[]> item) {
return (item.getSource() instanceof String) ?
(String) item.getSource() : item.getSource().toString();
}
};
return q.addItemListener(il, withValue);
}
@Override
public boolean removeItemListener(String registrationId) {
return q.removeItemListener(registrationId);
}
@Deprecated
@Override
public Object getId() {
return q.getId();
}
@Override
public String getPartitionKey() {
return q.getPartitionKey();
}
@Override
public String getName() {
return q.getName();
}
@Override
public String getServiceName() {
return q.getServiceName();
}
@Override
public void destroy() {
q.destroy();
}
private final class DeserializeVal implements Function<byte[], T> {
@Override
public T apply(byte[] input) {
return deserialize(input);
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.onosproject.store.serializers.StoreSerializer;
import com.hazelcast.core.TransactionalMap;
import com.hazelcast.query.Predicate;
/**
* Wrapper around TransactionalMap&lt;byte[], byte[]&gt; which serializes/deserializes
* key and value using StoreSerializer.
*
* @param <K> key type
* @param <V> value type
*/
public class STxMap<K, V> implements TransactionalMap<K, V> {
private final TransactionalMap<byte[], byte[]> m;
private final StoreSerializer serializer;
/**
* Creates a STxMap instance.
*
* @param baseMap base IMap to use
* @param serializer serializer to use for both key and value
*/
public STxMap(TransactionalMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
this.m = checkNotNull(baseMap);
this.serializer = checkNotNull(serializer);
}
@Override
public int size() {
return m.size();
}
@Override
public boolean isEmpty() {
return m.isEmpty();
}
@Deprecated
@Override
public Object getId() {
return m.getId();
}
@Override
public String getPartitionKey() {
return m.getPartitionKey();
}
@Override
public String getName() {
return m.getName();
}
@Override
public String getServiceName() {
return m.getServiceName();
}
@Override
public void destroy() {
m.destroy();
}
@Override
public boolean containsKey(Object key) {
return m.containsKey(serializeKey(key));
}
@Override
public V get(Object key) {
return deserializeVal(m.get(serializeKey(key)));
}
@Override
public V getForUpdate(Object key) {
return deserializeVal(m.getForUpdate(serializeKey(key)));
}
@Override
public V put(K key, V value) {
return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
}
@Override
public V remove(Object key) {
return deserializeVal(m.remove(serializeKey(key)));
}
@Override
public boolean remove(Object key, Object value) {
return m.remove(serializeKey(key), serializeVal(value));
}
@Override
public void delete(Object key) {
m.delete(serializeKey(key));
}
@Override
public V put(K key, V value, long ttl, TimeUnit timeunit) {
return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
}
@Override
public V putIfAbsent(K key, V value) {
return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
}
@Override
public V replace(K key, V value) {
return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
}
@Override
public void set(K key, V value) {
m.set(serializeKey(key), serializeVal(value));
}
@Override
public Set<K> keySet() {
return deserializeKeySet(m.keySet());
}
@Override
public Collection<V> values() {
return deserializeVals(m.values());
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Set<K> keySet(Predicate predicate) {
throw new UnsupportedOperationException();
}
@Deprecated // marking method not implemented
@SuppressWarnings("rawtypes")
@Override
public Collection<V> values(Predicate predicate) {
throw new UnsupportedOperationException();
}
private byte[] serializeKey(Object key) {
return serializer.encode(key);
}
private K deserializeKey(byte[] key) {
return serializer.decode(key);
}
private byte[] serializeVal(Object val) {
return serializer.encode(val);
}
private V deserializeVal(byte[] val) {
if (val == null) {
return null;
}
return serializer.decode(val.clone());
}
private Set<K> deserializeKeySet(Set<byte[]> keys) {
Set<K> dsk = new HashSet<>(keys.size());
for (byte[] key : keys) {
dsk.add(deserializeKey(key));
}
return dsk;
}
private Collection<V> deserializeVals(Collection<byte[]> vals) {
Collection<V> dsl = new ArrayList<>(vals.size());
for (byte[] val : vals) {
dsl.add(deserializeVal(val));
}
return dsl;
}
}
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
/**
* Auxiliary bootstrap of distributed store.
*/
@Component(immediate = false, enabled = false)
@Service
public class StoreManager implements StoreService {
protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
private final Logger log = LoggerFactory.getLogger(getClass());
protected HazelcastInstance instance;
@Activate
public void activate() {
try {
File hazelcastFile = new File(HAZELCAST_XML_FILE);
if (!hazelcastFile.exists()) {
createDefaultHazelcastFile(hazelcastFile);
}
Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
instance = Hazelcast.newHazelcastInstance(config);
log.info("Started");
} catch (FileNotFoundException e) {
log.error("Unable to configure Hazelcast", e);
}
}
private void createDefaultHazelcastFile(File hazelcastFile) {
String ip = ClusterDefinitionManager.getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
InputStream his = getClass().getResourceAsStream("/hazelcast.xml");
try {
String hzCfg = new String(ByteStreams.toByteArray(his), "UTF-8");
hzCfg = hzCfg.replaceFirst("@NAME", ip);
hzCfg = hzCfg.replaceFirst("@PREFIX", ipPrefix);
Files.write(hzCfg.getBytes("UTF-8"), hazelcastFile);
} catch (IOException e) {
log.error("Unable to write default hazelcast file", e);
}
}
@Deactivate
public void deactivate() {
instance.shutdown();
log.info("Stopped");
}
@Override
public HazelcastInstance getHazelcastInstance() {
return instance;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import com.hazelcast.core.HazelcastInstance;
/**
* Bootstrap service to get a handle on a share Hazelcast instance.
*/
public interface StoreService {
/**
* Returns the shared Hazelcast instance for use as a distributed store
* backing.
*
* @return shared Hazelcast instance
*/
HazelcastInstance getHazelcastInstance();
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Common abstractions and facilities for implementing distributed store
* using Hazelcast.
*/
package org.onosproject.store.hz;
......@@ -27,7 +27,7 @@ import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Serializer for RoleValues used by {@link DistributedMastershipStore}.
* Serializer for RoleValues used by {@link org.onosproject.mastership.MastershipStore}.
*/
public class RoleValueSerializer extends Serializer<RoleValue> {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2015 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!--
The default Hazelcast configuration. This is used when:
- no hazelcast.xml if present
-->
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
<name>@NAME</name>
<password>rocks</password>
</group>
<management-center enabled="false">http://localhost:8080/mancenter</management-center>
<properties>
<property name="hazelcast.max.no.heartbeat.seconds">30</property>
<property name="hazelcast.merge.first.run.delay.seconds">30</property>
<property name="hazelcast.merge.next.run.delay.seconds">30</property>
</properties>
<network>
<port auto-increment="true" port-count="100">5701</port>
<outbound-ports>
<!--
Allowed port range when connecting to other nodes.
0 or * means use system provided port.
-->
<ports>0</ports>
</outbound-ports>
<join>
<multicast enabled="true">
<multicast-group>224.2.2.3</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false">
<interface>127.0.0.1</interface>
</tcp-ip>
</join>
<interfaces enabled="true">
<interface>@PREFIX</interface>
</interfaces>
<ssl enabled="false"/>
<socket-interceptor enabled="false"/>
<symmetric-encryption enabled="false">
<!--
encryption algorithm such as
DES/ECB/PKCS5Padding,
PBEWithMD5AndDES,
AES/CBC/PKCS5Padding,
Blowfish,
DESede
-->
<algorithm>PBEWithMD5AndDES</algorithm>
<!-- salt value to use when generating the secret key -->
<salt>thesalt</salt>
<!-- pass phrase to use when generating the secret key -->
<password>thepass</password>
<!-- iteration count to use when generating the secret key -->
<iteration-count>19</iteration-count>
</symmetric-encryption>
</network>
<partition-group enabled="false"/>
<executor-service name="default">
<pool-size>16</pool-size>
<!--Queue capacity. 0 means Integer.MAX_VALUE.-->
<queue-capacity>0</queue-capacity>
</executor-service>
<queue name="default">
<!--
Maximum size of the queue. When a JVM's local queue size reaches the maximum,
all put/offer operations will get blocked until the queue size
of the JVM goes down below the maximum.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size>0</max-size>
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backup-count>1</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
</queue>
<map name="default">
<!--
Data type that will be used for storing recordMap.
Possible values:
BINARY (default): keys and values will be stored as binary data
OBJECT : values will be stored in their object forms
OFFHEAP : values will be stored in non-heap region of JVM
-->
<in-memory-format>BINARY</in-memory-format>
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backup-count>1</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<!--
Maximum number of seconds for each entry to stay in the map. Entries that are
older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
will get automatically evicted from the map.
Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
-->
<time-to-live-seconds>0</time-to-live-seconds>
<!--
Maximum number of seconds for each entry to stay idle in the map. Entries that are
idle(not touched) for more than <max-idle-seconds> will get
automatically evicted from the map. Entry is touched if get, put or containsKey is called.
Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
-->
<max-idle-seconds>0</max-idle-seconds>
<!--
Valid values are:
NONE (no eviction),
LRU (Least Recently Used),
LFU (Least Frequently Used).
NONE is the default.
-->
<eviction-policy>NONE</eviction-policy>
<!--
Maximum size of the map. When max size is reached,
map is evicted based on the policy defined.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size policy="PER_NODE">0</max-size>
<!--
When max. size is reached, specified percentage of
the map will be evicted. Any integer between 0 and 100.
If 25 is set for example, 25% of the entries will
get evicted.
-->
<eviction-percentage>25</eviction-percentage>
<!--
Minimum time in milliseconds which should pass before checking
if a partition of this map is evictable or not.
Default value is 100 millis.
-->
<min-eviction-check-millis>100</min-eviction-check-millis>
<!--
While recovering from split-brain (network partitioning),
map entries in the small cluster will merge into the bigger cluster
based on the policy set here. When an entry merge into the
cluster, there might an existing entry with the same key already.
Values of these entries might be different for that same key.
Which value should be set for the key? Conflict is resolved by
the policy set here. Default policy is PutIfAbsentMapMergePolicy
There are built-in merge policies such as
com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key.
com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
-->
<merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
</map>
<multimap name="default">
<backup-count>1</backup-count>
<value-collection-type>SET</value-collection-type>
</multimap>
<multimap name="default">
<backup-count>1</backup-count>
<value-collection-type>SET</value-collection-type>
</multimap>
<list name="default">
<backup-count>1</backup-count>
</list>
<set name="default">
<backup-count>1</backup-count>
</set>
<jobtracker name="default">
<max-thread-size>0</max-thread-size>
<!-- Queue size 0 means number of partitions * 2 -->
<queue-size>0</queue-size>
<retry-count>0</retry-count>
<chunk-size>1000</chunk-size>
<communicate-stats>true</communicate-stats>
<topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
</jobtracker>
<semaphore name="default">
<initial-permits>0</initial-permits>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
</semaphore>
<serialization>
<portable-version>0</portable-version>
</serialization>
<services enable-defaults="true"/>
</hazelcast>
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.hz;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.io.FileNotFoundException;
import java.util.UUID;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.test.TestHazelcastInstanceFactory;
/**
* Dummy StoreManager to use specified Hazelcast instance.
*/
public class TestStoreManager extends StoreManager {
private TestHazelcastInstanceFactory factory;
/**
* Gets the Hazelcast Config for testing.
*
* @return Hazelcast Configuration for testing
*/
public static Config getTestConfig() {
Config config;
try {
config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
} catch (FileNotFoundException e) {
// falling back to default
config = new Config();
}
// avoid accidentally joining other cluster
config.getGroupConfig().setName(UUID.randomUUID().toString());
// quickly form single node cluster
config.getNetworkConfig().getJoin()
.getTcpIpConfig()
.setEnabled(true).setConnectionTimeoutSeconds(0);
config.getNetworkConfig().getJoin()
.getMulticastConfig()
.setEnabled(false);
return config;
}
/**
* Creates an instance of dummy Hazelcast instance for testing.
*
* @return HazelcastInstance
*/
public HazelcastInstance initSingleInstance() {
return initInstances(1)[0];
}
/**
* Creates some instances of dummy Hazelcast instances for testing.
*
* @param count number of instances to create
* @return array of HazelcastInstances
*/
public HazelcastInstance[] initInstances(int count) {
checkArgument(count > 0, "Cluster size must be > 0");
factory = new TestHazelcastInstanceFactory(count);
return factory.newInstances(getTestConfig());
}
/**
* Sets the Hazelast instance to return on #getHazelcastInstance().
*
* @param instance Hazelast instance to return on #getHazelcastInstance()
*/
public void setHazelcastInstance(HazelcastInstance instance) {
this.instance = instance;
}
@Override
public void activate() {
// Hazelcast setup removed from original code.
checkState(this.instance != null, "HazelcastInstance needs to be set");
}
@Override
public void deactivate() {
// Hazelcast instance shutdown removed from original code.
factory.shutdownAll();
}
}
......@@ -15,49 +15,11 @@
*/
package org.onosproject.store.mastership.impl;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.junit.TestTools;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipEvent.Type;
import org.onosproject.mastership.MastershipStoreDelegate;
import org.onosproject.mastership.MastershipTerm;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.store.hz.StoreManager;
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.hz.TestStoreManager;
import org.onosproject.store.serializers.KryoSerializer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.net.MastershipRole.NONE;
import static org.onosproject.net.MastershipRole.STANDBY;
/**
* Test of the Hazelcast-based distributed MastershipStore implementation.
*/
public class DistributedMastershipStoreTest {
/*
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
......@@ -320,5 +282,5 @@ public class DistributedMastershipStoreTest {
}
}
*/
}
......
......@@ -15,50 +15,11 @@
*/
package org.onosproject.store.resource.impl;
import java.util.HashSet;
import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.util.Bandwidth;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Annotations;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DefaultLink;
import org.onosproject.net.Link;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.resource.BandwidthResource;
import org.onosproject.net.resource.BandwidthResourceAllocation;
import org.onosproject.net.resource.DefaultLinkResourceAllocations;
import org.onosproject.net.resource.DefaultLinkResourceRequest;
import org.onosproject.net.resource.LambdaResource;
import org.onosproject.net.resource.LambdaResourceAllocation;
import org.onosproject.net.resource.LinkResourceAllocations;
import org.onosproject.net.resource.LinkResourceRequest;
import org.onosproject.net.resource.LinkResourceStore;
import org.onosproject.net.resource.ResourceAllocation;
import org.onosproject.net.resource.ResourceAllocationException;
import org.onosproject.net.resource.ResourceType;
import org.onosproject.store.hz.StoreService;
import org.onosproject.store.hz.TestStoreManager;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.Link.Type.DIRECT;
import static org.onosproject.net.PortNumber.portNumber;
/**
* Test of the simple LinkResourceStore implementation.
*/
public class HazelcastLinkResourceStoreTest {
/*
private LinkResourceStore store;
private HazelcastLinkResourceStore storeImpl;
private Link link1;
......@@ -74,7 +35,7 @@ public class HazelcastLinkResourceStoreTest {
* @param dev2 destination device
* @param port2 destination port
* @return created {@link Link} object
*/
* /
private Link newLink(String dev1, int port1, String dev2, int port2) {
Annotations annotations = DefaultAnnotations.builder()
.set(AnnotationKeys.OPTICAL_WAVES, "80")
......@@ -112,9 +73,6 @@ public class HazelcastLinkResourceStoreTest {
storeMgr.deactivate();
}
/**
* Tests constructor and activate method.
*/
@Test
public void testConstructorAndActivate() {
final Iterable<LinkResourceAllocations> allAllocations = store.getAllocations();
......@@ -130,13 +88,6 @@ public class HazelcastLinkResourceStoreTest {
assertNotNull(res);
}
/**
* Picks up and returns one of bandwidth allocations from a given set.
*
* @param resources the set of {@link ResourceAllocation}s
* @return {@link BandwidthResourceAllocation} object if found, null
* otherwise
*/
private BandwidthResourceAllocation getBandwidthObj(Set<ResourceAllocation> resources) {
for (ResourceAllocation res : resources) {
if (res.type() == ResourceType.BANDWIDTH) {
......@@ -146,12 +97,6 @@ public class HazelcastLinkResourceStoreTest {
return null;
}
/**
* Returns all lambda allocations from a given set.
*
* @param resources the set of {@link ResourceAllocation}s
* @return a set of {@link LambdaResourceAllocation} objects
*/
private Set<LambdaResourceAllocation> getLambdaObjs(Set<ResourceAllocation> resources) {
Set<LambdaResourceAllocation> lambdaResources = new HashSet<>();
for (ResourceAllocation res : resources) {
......@@ -162,9 +107,6 @@ public class HazelcastLinkResourceStoreTest {
return lambdaResources;
}
/**
* Tests initial free bandwidth for a link.
*/
@Test
public void testInitialBandwidth() {
final Set<ResourceAllocation> freeRes = store.getFreeResources(link1);
......@@ -176,9 +118,6 @@ public class HazelcastLinkResourceStoreTest {
assertEquals(new BandwidthResource(Bandwidth.mbps(1000.0)), alloc.bandwidth());
}
/**
* Tests initial free lambda for a link.
*/
@Test
public void testInitialLambdas() {
final Set<ResourceAllocation> freeRes = store.getFreeResources(link3);
......@@ -198,9 +137,6 @@ public class HazelcastLinkResourceStoreTest {
}
/**
* Tests a successful bandwidth allocation.
*/
@Test
public void testSuccessfulBandwidthAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
......@@ -219,9 +155,6 @@ public class HazelcastLinkResourceStoreTest {
store.allocateResources(allocations);
}
/**
* Tests a unsuccessful bandwidth allocation.
*/
@Test
public void testUnsuccessfulBandwidthAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
......@@ -247,9 +180,6 @@ public class HazelcastLinkResourceStoreTest {
assertEquals(true, gotException);
}
/**
* Tests a successful bandwidth allocation.
*/
@Test
public void testSuccessfulLambdaAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
......@@ -268,9 +198,6 @@ public class HazelcastLinkResourceStoreTest {
store.allocateResources(allocations);
}
/**
* Tests a unsuccessful bandwidth allocation.
*/
@Test
public void testUnsuccessfulLambdaAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
......@@ -296,4 +223,5 @@ public class HazelcastLinkResourceStoreTest {
}
assertEquals(true, gotException);
}
*/
}
......
......@@ -37,7 +37,6 @@
<bundle>mvn:joda-time/joda-time/2.5</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.4</bundle>
<bundle>mvn:io.dropwizard.metrics/metrics-core/3.1.0</bundle>
<bundle>mvn:io.dropwizard.metrics/metrics-json/3.1.0</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
......
......@@ -58,21 +58,15 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
</dependencies>
......
......@@ -287,18 +287,6 @@
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>3.4</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.eclipsesource.minimal-json</groupId>
<artifactId>minimal-json</artifactId>
<version>0.9.1</version>
......
......@@ -17,7 +17,7 @@ export STAGE=$(dirname $KARAF_ROOT)
# Validates the specified IP regular expression against existing adapters.
# Excludes local-loopback.
function validateIp {
ifconfig | awk '{ print $2}' | grep -E -o "([0-9]{1,3}[\.]){3}[0-9]{1,3}" | grep -v "127\.0\.0\.1" | grep $1
ifconfig | awk '{ print $2}' | grep -E -o "([0-9]{1,3}[\.]){3}[0-9]{1,3}" | grep $1
}
# Clean the previous Karaf directory if requested and if it exists.
......@@ -26,6 +26,7 @@ if [ "$1" = "clean" ]; then
[ -d $KARAF_ROOT ] && rm -fr $KARAF_ROOT $STAGE/apps $STAGE/config
fi
ONOS_IP=${ONOS_IP:-127.0.0.1}
IP="${1:-$ONOS_IP}"
# If IP was not given, nor configured attempt to use ONOS_NIC env. variable
......@@ -104,11 +105,6 @@ cat > $STAGE/config/tablets.json <<EOF
"partitions": { "p1": [ { "ip": "$IP", "id": "$IP", "tcpPort": 9876 }]}}
EOF
echo "Setting up hazelcast.xml for subnet $SUBNET.*..."
cp $ONOS_ROOT/tools/package/etc/hazelcast.xml $KARAF_ROOT/etc/hazelcast.xml
perl -pi.old -e "s/192.168.56/$SUBNET/" $KARAF_ROOT/etc/hazelcast.xml
perl -pi.old -e "s/ <name>onos</ <name>$IP</" $KARAF_ROOT/etc/hazelcast.xml
echo "Staging builtin apps..."
rm -fr $STAGE/apps
onos-stage-apps $STAGE/apps $KARAF_ROOT/system
......
#!/bin/bash
# -----------------------------------------------------------------------------
# Configures ONOS to multicast on the specified IP prefix/subnet.
# -----------------------------------------------------------------------------
[ $# -lt 2 ] && echo "usage: $(basename $0) name ipPrefix" && exit 1
name=$1
ipPrefix=$2
hzXml=$(dirname $0)/../apache-karaf-*/etc/hazelcast.xml
perl -pi.bak -e "s/^ <interface>[^<]*/ <interface>$ipPrefix/g" $hzXml
perl -pi -e "s/ <name>[^<]*/ <name>$name/g" $hzXml
echo "This command has been deprecated as this step is no longer required."
\ No newline at end of file
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
~ Copyright 2014 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!--
The default Hazelcast configuration. This is used when:
- no hazelcast.xml if present
-->
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<group>
<name>onos</name>
<password>rocks</password>
</group>
<management-center enabled="false">http://localhost:8080/mancenter</management-center>
<properties>
<property name="hazelcast.max.no.heartbeat.seconds">30</property>
<property name="hazelcast.merge.first.run.delay.seconds">30</property>
<property name="hazelcast.merge.next.run.delay.seconds">30</property>
</properties>
<network>
<port auto-increment="true" port-count="100">5701</port>
<outbound-ports>
<!--
Allowed port range when connecting to other nodes.
0 or * means use system provided port.
-->
<ports>0</ports>
</outbound-ports>
<join>
<multicast enabled="true">
<multicast-group>224.2.2.3</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false">
<interface>127.0.0.1</interface>
</tcp-ip>
</join>
<interfaces enabled="true">
<interface>192.168.56.*</interface>
</interfaces>
<ssl enabled="false"/>
<socket-interceptor enabled="false"/>
<symmetric-encryption enabled="false">
<!--
encryption algorithm such as
DES/ECB/PKCS5Padding,
PBEWithMD5AndDES,
AES/CBC/PKCS5Padding,
Blowfish,
DESede
-->
<algorithm>PBEWithMD5AndDES</algorithm>
<!-- salt value to use when generating the secret key -->
<salt>thesalt</salt>
<!-- pass phrase to use when generating the secret key -->
<password>thepass</password>
<!-- iteration count to use when generating the secret key -->
<iteration-count>19</iteration-count>
</symmetric-encryption>
</network>
<partition-group enabled="false"/>
<executor-service name="default">
<pool-size>16</pool-size>
<!--Queue capacity. 0 means Integer.MAX_VALUE.-->
<queue-capacity>0</queue-capacity>
</executor-service>
<queue name="default">
<!--
Maximum size of the queue. When a JVM's local queue size reaches the maximum,
all put/offer operations will get blocked until the queue size
of the JVM goes down below the maximum.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size>0</max-size>
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backup-count>1</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
</queue>
<map name="default">
<!--
Data type that will be used for storing recordMap.
Possible values:
BINARY (default): keys and values will be stored as binary data
OBJECT : values will be stored in their object forms
OFFHEAP : values will be stored in non-heap region of JVM
-->
<in-memory-format>BINARY</in-memory-format>
<!--
Number of backups. If 1 is set as the backup-count for example,
then all entries of the map will be copied to another JVM for
fail-safety. 0 means no backup.
-->
<backup-count>1</backup-count>
<!--
Number of async backups. 0 means no backup.
-->
<async-backup-count>0</async-backup-count>
<!--
Maximum number of seconds for each entry to stay in the map. Entries that are
older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
will get automatically evicted from the map.
Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
-->
<time-to-live-seconds>0</time-to-live-seconds>
<!--
Maximum number of seconds for each entry to stay idle in the map. Entries that are
idle(not touched) for more than <max-idle-seconds> will get
automatically evicted from the map. Entry is touched if get, put or containsKey is called.
Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
-->
<max-idle-seconds>0</max-idle-seconds>
<!--
Valid values are:
NONE (no eviction),
LRU (Least Recently Used),
LFU (Least Frequently Used).
NONE is the default.
-->
<eviction-policy>NONE</eviction-policy>
<!--
Maximum size of the map. When max size is reached,
map is evicted based on the policy defined.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size policy="PER_NODE">0</max-size>
<!--
When max. size is reached, specified percentage of
the map will be evicted. Any integer between 0 and 100.
If 25 is set for example, 25% of the entries will
get evicted.
-->
<eviction-percentage>25</eviction-percentage>
<!--
Minimum time in milliseconds which should pass before checking
if a partition of this map is evictable or not.
Default value is 100 millis.
-->
<min-eviction-check-millis>100</min-eviction-check-millis>
<!--
While recovering from split-brain (network partitioning),
map entries in the small cluster will merge into the bigger cluster
based on the policy set here. When an entry merge into the
cluster, there might an existing entry with the same key already.
Values of these entries might be different for that same key.
Which value should be set for the key? Conflict is resolved by
the policy set here. Default policy is PutIfAbsentMapMergePolicy
There are built-in merge policies such as
com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key.
com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
-->
<merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
</map>
<multimap name="default">
<backup-count>1</backup-count>
<value-collection-type>SET</value-collection-type>
</multimap>
<multimap name="default">
<backup-count>1</backup-count>
<value-collection-type>SET</value-collection-type>
</multimap>
<list name="default">
<backup-count>1</backup-count>
</list>
<set name="default">
<backup-count>1</backup-count>
</set>
<jobtracker name="default">
<max-thread-size>0</max-thread-size>
<!-- Queue size 0 means number of partitions * 2 -->
<queue-size>0</queue-size>
<retry-count>0</retry-count>
<chunk-size>1000</chunk-size>
<communicate-stats>true</communicate-stats>
<topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
</jobtracker>
<semaphore name="default">
<initial-permits>0</initial-permits>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
</semaphore>
<serialization>
<portable-version>0</portable-version>
</serialization>
<services enable-defaults="true"/>
</hazelcast>
......@@ -26,11 +26,6 @@ echo "]}" >> $CDEF_FILE
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/cluster.json
ssh $remote "
sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
sudo perl -pi -e \"s/ <name>onos</ <name>${ONOS_CELL:-onos}</g\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
echo \"onos.ip = \$(sudo ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
......@@ -38,10 +33,6 @@ ssh $remote "
echo "log4j.logger.net.kuujo.copycat= INFO" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.ops4j.pax.logging.cfg
# Suppress Hazelcast multicast joiner warning
echo "log4j.logger.com.hazelcast.cluster.impl.MulticastService= ERROR" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.ops4j.pax.logging.cfg
# Patch the Apache Karaf distribution file to load ONOS boot features
perl -pi.old -e \"s|^(featuresBoot=.*,management)(,webconsole,.*)|\1,$ONOS_BOOT_FEATURES|\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.apache.karaf.features.cfg
......