alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

......@@ -12,11 +12,13 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
......@@ -84,16 +86,20 @@ public class ClusterCommunicationManager
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
@Activate
public void activate() {
addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
......@@ -154,7 +160,7 @@ public class ClusterCommunicationManager
@Override
public void removeNode(DefaultControllerNode node) {
send(new GoodbyeMessage(node.id()));
send(new LeavingMemberMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
......@@ -177,7 +183,7 @@ public class ClusterCommunicationManager
@Override
public void clearAllNodesAndStreams() {
nodes.clear();
send(new GoodbyeMessage(localNode.id()));
send(new LeavingMemberMessage(localNode.id()));
for (ClusterMessageStream stream : streams.values()) {
stream.close();
}
......@@ -187,7 +193,7 @@ public class ClusterCommunicationManager
/**
* Dispatches the specified message to all subscribers to its subject.
*
* @param message message to dispatch
* @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void dispatch(ClusterMessage message, NodeId fromNodeId) {
......@@ -200,7 +206,7 @@ public class ClusterCommunicationManager
}
/**
* Removes the stream associated with the specified node.
* Adds the stream associated with the specified node.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
......@@ -212,6 +218,7 @@ public class ClusterCommunicationManager
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
return node;
}
......@@ -329,11 +336,19 @@ public class ClusterCommunicationManager
}
}
private class GoodbyeSubscriber implements MessageSubscriber {
private class MembershipSubscriber implements MessageSubscriber {
@Override
public void receive(ClusterMessage message, NodeId fromNodeId) {
log.info("Received goodbye message from {}", fromNodeId);
nodesDelegate.nodeRemoved(fromNodeId);
MessageSubject subject = message.subject();
ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
if (message.subject() == MessageSubject.NEW_MEMBER) {
log.info("Node {} arrived", cmm.nodeId());
nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
} else if (subject == MessageSubject.LEAVING_MEMBER) {
log.info("Node {} is leaving", cmm.nodeId());
nodesDelegate.nodeRemoved(cmm.nodeId());
}
}
}
}
......
......@@ -128,10 +128,11 @@ public class DistributedClusterStore
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
// FIXME: this is still broken
// We are being ejected from the cluster, so remove all other nodes.
communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
nodes.put(localNode.id(), localNode);
} else {
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
......@@ -152,6 +153,7 @@ public class DistributedClusterStore
states.put(nodeId, State.ACTIVE);
return node;
}
@Override
public void nodeVanished(NodeId nodeId) {
states.put(nodeId, State.INACTIVE);
......
......@@ -23,9 +23,10 @@ import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.EchoMessage;
import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
......@@ -97,7 +98,8 @@ public class MessageSerializer implements SerializationService {
MessageSubject.class,
HelloMessage.class,
GoodbyeMessage.class,
NewMemberMessage.class,
LeavingMemberMessage.class,
EchoMessage.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Base for cluster membership messages.
*/
public abstract class ClusterMembershipMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
protected ClusterMembershipMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new membership message for the specified end-point data.
*
* @param subject message subject
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
IpPrefix ipAddress, int tcpPort) {
super(subject);
this.nodeId = nodeId;
this.ipAddress = ipAddress;
this.tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
......@@ -6,18 +6,10 @@ import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
public class HelloMessage extends ClusterMembershipMessage {
// For serialization
private HelloMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
......@@ -28,37 +20,7 @@ public class HelloMessage extends ClusterMessage {
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
this.nodeId = nodeId;
this.ipAddress = ipAddress;
this.tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
}
}
......
......@@ -3,16 +3,13 @@ package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Goodbye message that nodes use to leave the cluster for good.
* Announcement message that nodes use to gossip about team departures.
*/
public class GoodbyeMessage extends ClusterMessage {
private NodeId nodeId;
public class LeavingMemberMessage extends ClusterMembershipMessage {
// For serialization
private GoodbyeMessage() {
super(MessageSubject.GOODBYE);
nodeId = null;
private LeavingMemberMessage() {
super();
}
/**
......@@ -20,18 +17,8 @@ public class GoodbyeMessage extends ClusterMessage {
*
* @param nodeId sending node identification
*/
public GoodbyeMessage(NodeId nodeId) {
super(MessageSubject.HELLO);
this.nodeId = nodeId;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
public LeavingMemberMessage(NodeId nodeId) {
super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
}
}
......
......@@ -8,8 +8,11 @@ public enum MessageSubject {
/** Represents a first greeting message. */
HELLO,
/** Signifies node's intent to leave the cluster. */
GOODBYE,
/** Signifies announcement about new member. */
NEW_MEMBER,
/** Signifies announcement about leaving member. */
LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Announcement message that nodes use to gossip about new arrivals.
*/
public class NewMemberMessage extends ClusterMembershipMessage {
// For serialization
private NewMemberMessage() {
}
/**
* Creates a new gossip message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
}
}