Yuta HIGUCHI

moved Hazelcast based Mastership+Cluster store to onos-code-dist bundle

Change-Id: I304f916f3a400eaf050a5351825634349790e1bf
Showing 29 changed files with 27 additions and 309 deletions
......@@ -58,6 +58,10 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -16,6 +16,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
//Not used right now
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
//Not used right now
public final class ClusterManagementMessageSubjects {
// avoid instantiation
private ClusterManagementMessageSubjects() {}
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.ControllerNode;
//Not used right now
/**
* Contains information that will be published when a cluster membership event occurs.
*/
......
package org.onlab.onos.store.cluster.impl;
//Not used right now
public enum ClusterMembershipEventType {
NEW_MEMBER,
LEAVING_MEMBER,
......
......@@ -4,6 +4,7 @@ import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
// Not used right now
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
......
......@@ -19,9 +19,9 @@ import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.common.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.OptionalCacheLoader;
import org.onlab.onos.store.hz.AbsentInvalidatingLoadingCache;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.OptionalCacheLoader;
import org.onlab.packet.IpPrefix;
import java.util.Map;
......
package org.onlab.onos.store.cluster.impl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Distributed implementation of the cluster nodes store.
*/
//@Component(immediate = true)
//@Service
public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private final Logger log = LoggerFactory.getLogger(getClass());
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS)
.removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationAdminService clusterCommunicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InternalNodesDelegate();
@Activate
public void activate() throws IOException {
loadClusterDefinition();
establishSelfIdentity();
// Start-up the comm service and prime it with the loaded nodes.
clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
clusterCommunicationAdminService.addNode(node);
}
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Loads the cluster definition file.
*/
private void loadClusterDefinition() {
ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
try {
Set<DefaultControllerNode> storedNodes = cds.read();
for (DefaultControllerNode node : storedNodes) {
nodes.put(node.id(), node);
}
} catch (IOException e) {
log.error("Unable to read cluster definitions", e);
}
}
/**
* Determines who the local controller node is.
*/
private void establishSelfIdentity() {
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
localNode = nodes.get(new NodeId(ip.toString()));
// As a fall-back, let's make sure we at least know who we are.
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
}
states.put(localNode.id(), State.ACTIVE);
}
@Override
public ControllerNode getLocalNode() {
return localNode;
}
@Override
public Set<ControllerNode> getNodes() {
ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
return builder.addAll(nodes.values()).build();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@Override
public State getState(NodeId nodeId) {
State state = states.get(nodeId);
return state == null ? State.INACTIVE : state;
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
clusterCommunicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
nodes.clear();
nodes.put(localNode.id(), localNode);
} else {
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
clusterCommunicationAdminService.removeNode(node);
}
}
}
// Entity to handle back calls from the connection manager.
private class InternalNodesDelegate implements ClusterNodesDelegate {
@Override
public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = nodes.get(nodeId);
if (node == null) {
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
states.put(nodeId, State.ACTIVE);
livenessCache.put(nodeId, node);
return node;
}
@Override
public void nodeVanished(NodeId nodeId) {
states.put(nodeId, State.INACTIVE);
}
@Override
public void nodeRemoved(NodeId nodeId) {
removeNode(nodeId);
}
}
private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
@Override
public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
NodeId nodeId = entry.getKey();
log.warn("Failed to receive heartbeats from controller: " + nodeId);
nodesDelegate.nodeVanished(nodeId);
}
}
}
/**
* Distributed cluster store and messaging subsystem implementation.
* Implementation of a distributed cluster node store using Hazelcast.
*/
package org.onlab.onos.store.cluster.impl;
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import static com.google.common.base.Preconditions.checkNotNull;
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import static com.google.common.base.Preconditions.checkNotNull;
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import com.hazelcast.core.HazelcastInstance;
......
......@@ -2,4 +2,4 @@
* Common abstractions and facilities for implementing distributed store
* using Hazelcast.
*/
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
......
......@@ -20,8 +20,8 @@ import org.onlab.onos.mastership.MastershipStoreDelegate;
import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.common.AbstractHazelcastStore;
import org.onlab.onos.store.common.SMap;
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
......@@ -31,6 +31,7 @@ import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.MapEvent;
import static org.onlab.onos.net.MastershipRole.*;
/**
......
package org.onlab.onos.store.common;
package org.onlab.onos.store.hz;
import java.io.FileNotFoundException;
import java.util.UUID;
......
......@@ -28,9 +28,9 @@ import org.onlab.onos.mastership.MastershipTerm;
import org.onlab.onos.mastership.MastershipEvent.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.hz.StoreManager;
import org.onlab.onos.store.hz.StoreService;
import org.onlab.onos.store.hz.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.packet.IpPrefix;
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz-cluster</artifactId>
<packaging>bundle</packaging>
<description>ONOS Hazelcast based distributed store subsystems</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
/**
* Implementation of a distributed cluster node store using Hazelcast.
*/
package org.onlab.onos.store.cluster.impl;
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz-common</artifactId>
<packaging>bundle</packaging>
<description>ONOS Hazelcast based distributed store subsystems</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-hz</artifactId>
<packaging>pom</packaging>
<description>ONOS Core Hazelcast Store subsystem</description>
<modules>
<module>common</module>
<module>cluster</module>
</modules>
<dependencies>
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
</dependencies>
</project>
......@@ -19,7 +19,6 @@
<modules>
<module>trivial</module>
<module>dist</module>
<module>hz</module>
<module>serializers</module>
</modules>
......
......@@ -87,9 +87,6 @@
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onlab-netty/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-trivial" version="1.0.0"
......