Yuta HIGUCHI

removed ClusterCommunicationAdminService and SerializationService

Change-Id: I91da0a5d65128e5ba5179b0eab41839eec706c71
......@@ -18,7 +18,6 @@ import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -48,7 +47,7 @@ public class DistributedClusterStore
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
.expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS)
.removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
// TODO: This service interface can be removed, once we properly start
// using ClusterService
/**
* Service for administering communications manager.
*/
public interface ClusterCommunicationAdminService {
/**
* Initialize.
*/
void initialize(ControllerNode localNode, ClusterNodesDelegate nodesDelegate);
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(ControllerNode node);
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(ControllerNode node);
}
\ No newline at end of file
package org.onlab.onos.store.cluster.messaging;
// FIXME: not used any more? remove
/**
* Service for encoding &amp; decoding intra-cluster message payload.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain the message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
*/
<T> T decode(byte[] data);
/**
* Encodes the specified message into the given byte buffer.
*
* @param message message to be encoded
* @param buffer byte buffer to receive the message data
*/
byte[] encode(Object message);
}
......@@ -4,8 +4,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -16,9 +14,6 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
......@@ -39,19 +34,13 @@ import org.slf4j.LoggerFactory;
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
implements ClusterCommunicationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private ControllerNode localNode;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private ClusterNodesDelegate nodesDelegate;
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
......@@ -72,7 +61,7 @@ public class ClusterCommunicationManager
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
ControllerNode localNode = clusterService.getLocalNode();
NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
......@@ -94,6 +83,7 @@ public class ClusterCommunicationManager
@Override
public boolean broadcast(ClusterMessage message) throws IOException {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
......@@ -105,6 +95,7 @@ public class ClusterCommunicationManager
@Override
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) throws IOException {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicast(message, nodeId) && ok;
......@@ -134,65 +125,6 @@ public class ClusterCommunicationManager
messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
}
@Override
public void initialize(ControllerNode localNode,
ClusterNodesDelegate delegate) {
this.localNode = localNode;
this.nodesDelegate = delegate;
this.addSubscriber(new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), new ClusterMemebershipEventHandler());
timer.schedule(new KeepAlive(), 0, HEART_BEAT_INTERVAL_MILLIS);
}
@Override
public void addNode(ControllerNode node) {
//members.put(node.id(), node);
}
@Override
public void removeNode(ControllerNode node) {
// broadcast(new ClusterMessage(
// localNode.id(),
// new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
// SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
//members.remove(node.id());
}
// Sends a heart beat to all peers.
private class KeepAlive extends TimerTask {
@Override
public void run() {
try {
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
} catch (IOException e) {
log.warn("I/O error while broadcasting heart beats.", e);
}
}
}
private class ClusterMemebershipEventHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
nodesDelegate.nodeDetected(node.id(), node.ip(), node.tcpPort());
} else if (event.type() == ClusterMembershipEventType.LEAVING_MEMBER) {
log.info("Node {} is leaving", node.id());
nodesDelegate.nodeRemoved(node.id());
} else if (event.type() == ClusterMembershipEventType.UNREACHABLE_MEMBER) {
log.info("Node {} is unreachable", node.id());
nodesDelegate.nodeVanished(node.id());
}
}
}
private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
......
package org.onlab.onos.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.KryoPoolUtil;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//FIXME: not used any more? remove
/**
* Factory for parsing messages sent between cluster members.
*/
@Component(immediate = true)
@Service
public class MessageSerializer implements SerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final int METADATA_LENGTH = 12; // 8 + 4
private static final int LENGTH_OFFSET = 8;
private static final long MARKER = 0xfeedcafebeaddeadL;
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(KryoPoolUtil.API)
// TODO: Should MessageSubject be in API bundle?
.register(MessageSubject.class)
.build()
.populate(1);
}
@Override
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
@Override
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
}
......@@ -40,22 +40,18 @@ public class ClusterCommunicationManagerTest {
@Before
public void setUp() throws Exception {
MessageSerializer messageSerializer = new MessageSerializer();
messageSerializer.activate();
NettyMessagingService messagingService = new NettyMessagingService();
messagingService.activate();
ccm1 = new ClusterCommunicationManager();
// ccm1.serializationService = messageSerializer;
ccm1.activate();
ccm2 = new ClusterCommunicationManager();
// ccm2.serializationService = messageSerializer;
ccm2.activate();
ccm1.initialize(node1, cnd1);
ccm2.initialize(node2, cnd2);
// ccm1.initialize(node1, cnd1);
// ccm2.initialize(node2, cnd2);
}
@After
......@@ -70,7 +66,7 @@ public class ClusterCommunicationManagerTest {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.addNode(node2);
// ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
}
......@@ -81,7 +77,7 @@ public class ClusterCommunicationManagerTest {
cnd1.latch = new CountDownLatch(1);
cnd2.latch = new CountDownLatch(1);
ccm1.addNode(node2);
// ccm1.addNode(node2);
validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
......