tom

Further simplified the store & connection manager relationship.

Showing 16 changed files with 277 additions and 345 deletions
package org.onlab.onos.ccc;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
public class ClusterDefinitionStore {
private final File file;
/**
* Creates a reader/writer of the cluster definition file.
*
* @param filePath location of the definition file
*/
public ClusterDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Returns set of the controller nodes, including self.
*
* @return set of controller nodes
*/
public Set<DefaultControllerNode> read() throws IOException {
Set<DefaultControllerNode> nodes = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
while (it.hasNext()) {
ObjectNode nodeDef = (ObjectNode) it.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpPrefix.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
return nodes;
}
/**
* Writes the given set of the controller nodes.
*
* @param nodes set of controller nodes
*/
public void write(Set<DefaultControllerNode> nodes) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = mapper.createObjectNode();
ArrayNode nodeDefs = mapper.createArrayNode();
clusterNodeDef.set("nodes", nodeDefs);
for (DefaultControllerNode node : nodes) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
clusterNodeDef);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
/**
* Service for administering communications manager.
*/
public interface ClusterCommunicationAdminService {
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node);
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node);
/**
* Starts-up the communications engine.
*
* @param localNode local controller node
* @param delegate nodes delegate
*/
void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
}
package org.onlab.onos.store.cluster.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -28,9 +41,12 @@ import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Manages connections to other controller cluster nodes.
* Implements the cluster communication services to use by other stores.
*/
public class ConnectionManager implements MessageSender {
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -43,10 +59,11 @@ public class ConnectionManager implements MessageSender {
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private final DefaultControllerNode localNode;
private final ClusterNodesDelegate nodesDelegate;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
private DefaultControllerNode localNode;
private ClusterNodesDelegate nodesDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
......@@ -54,6 +71,9 @@ public class ConnectionManager implements MessageSender {
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
......@@ -65,59 +85,110 @@ public class ConnectionManager implements MessageSender {
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
@Activate
public void activate() {
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
}
log.info("Stopped");
}
/**
* Creates a new connection manager.
*/
ConnectionManager(DefaultControllerNode localNode,
ClusterNodesDelegate nodesDelegate,
CommunicationsDelegate commsDelegate,
SerializationService serializationService) {
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
ClusterMessageStream stream = streams.get(toNodeId);
if (stream != null) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send message {} to node {}",
message.subject(), toNodeId);
}
}
return false;
}
@Override
public synchronized void addSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject,
MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void addNode(DefaultControllerNode node) {
nodes.add(node);
}
@Override
public void removeNode(DefaultControllerNode node) {
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
@Override
public void startUp(DefaultControllerNode localNode,
ClusterNodesDelegate delegate) {
this.localNode = localNode;
this.nodesDelegate = nodesDelegate;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
this.nodesDelegate = delegate;
commsDelegate.setSender(this);
startCommunications();
startListening();
startInitiating();
startInitiatingConnections();
log.info("Started");
}
/**
* Shuts down the connection manager.
*/
void shutdown() {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
log.info("Stopped");
}
/**
* Adds the node to the list of monitored nodes.
* Dispatches the specified message to all subscribers to its subject.
*
* @param node node to be added
* @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void addNode(DefaultControllerNode node) {
nodes.add(node);
void dispatch(ClusterMessage message, NodeId fromNodeId) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message, fromNodeId);
}
}
}
/**
* Removes the node from the list of monitored nodes.
* Removes the stream associated with the specified node.
*
* @param node node to be removed
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return controller node bound to the stream
*/
void removeNode(DefaultControllerNode node) {
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
ClusterMessageStream stream) {
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
return node;
}
/**
......@@ -126,23 +197,30 @@ public class ConnectionManager implements MessageSender {
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node);
nodesDelegate.nodeVanished(node.id());
streams.remove(node.id());
}
@Override
public boolean send(NodeId nodeId, ClusterMessage message) {
ClusterMessageStream stream = streams.get(nodeId);
if (stream != null) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send a message about {} to node {}",
message.subject(), nodeId);
/**
* Finds the least utilized IO worker.
*
* @return IO worker
*/
ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return false;
return leastUtilized;
}
/**
......@@ -154,8 +232,7 @@ public class ConnectionManager implements MessageSender {
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, commsDelegate,
serializationService, hello);
new ClusterIOWorker(this, serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
......@@ -177,8 +254,7 @@ public class ConnectionManager implements MessageSender {
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
workerFinder);
new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
......@@ -189,28 +265,27 @@ public class ConnectionManager implements MessageSender {
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiatingConnections() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
* channel with the given IO worker.
*
* @param loop loop with which the channel should be registered
* @param worker loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker loop) throws IOException {
ClusterIOWorker worker) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
worker.connectStream(ch);
}
// Sweeps through all controller nodes and attempts to open connection to
......@@ -219,9 +294,9 @@ public class ConnectionManager implements MessageSender {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (node != localNode && !streams.containsKey(node.id())) {
if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
try {
initiateConnection(node, workerFinder.findWorker());
initiateConnection(node, findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
......@@ -230,26 +305,4 @@ public class ConnectionManager implements MessageSender {
}
}
// Finds the least utilitied IO loop.
private class LeastUtilitiedWorkerFinder implements WorkerFinder {
@Override
public ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
}
}
......
......@@ -23,12 +23,12 @@ public class ClusterConnectionListener extends AcceptorLoop {
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final WorkerFinder workerFinder;
private final ClusterCommunicationManager manager;
ClusterConnectionListener(IpPrefix ip, int tcpPort,
WorkerFinder workerFinder) throws IOException {
ClusterConnectionListener(ClusterCommunicationManager manager,
IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.workerFinder = workerFinder;
this.manager = manager;
}
@Override
......@@ -41,7 +41,7 @@ public class ClusterConnectionListener extends AcceptorLoop {
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
workerFinder.findWorker().acceptStream(sc);
manager.findWorker().acceptStream(sc);
}
}
......
......@@ -3,8 +3,10 @@ package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,27 +31,23 @@ public class ClusterIOWorker extends
private static final long SELECT_TIMEOUT = 50;
private final ConnectionManager connectionManager;
private final CommunicationsDelegate commsDelegate;
private final ClusterCommunicationManager manager;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param connectionManager parent connection manager
* @param commsDelegate communications delegate for dispatching
* @param manager parent comms manager
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ConnectionManager connectionManager,
CommunicationsDelegate commsDelegate,
ClusterIOWorker(ClusterCommunicationManager manager,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.connectionManager = connectionManager;
this.commsDelegate = commsDelegate;
this.manager = manager;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
......@@ -61,11 +59,27 @@ public class ClusterIOWorker extends
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
for (ClusterMessage message : messages) {
commsDelegate.dispatch(message);
manager.dispatch(message, nodeId);
}
}
// Retrieves the node from the stream. If one is not bound, it attempts
// to bind it using the knowledge that the first message must be a hello.
private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
DefaultControllerNode node = stream.node();
if (node == null && !messages.isEmpty()) {
ClusterMessage firstMessage = messages.get(0);
if (firstMessage instanceof HelloMessage) {
HelloMessage hello = (HelloMessage) firstMessage;
node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
hello.tcpPort(), stream);
}
}
return node != null ? node.id() : null;
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
......@@ -99,7 +113,7 @@ public class ClusterIOWorker extends
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
connectionManager.removeNodeStream(node);
manager.removeNodeStream(node);
}
super.removeStream(stream);
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Simple back interface through which connection manager can interact with
......@@ -9,17 +11,20 @@ import org.onlab.onos.cluster.DefaultControllerNode;
public interface ClusterNodesDelegate {
/**
* Notifies about a new cluster node being detected.
* Notifies about cluster node coming online.
*
* @param node newly detected cluster node
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
* @param tcpPort node TCP listen port
* @return the controller node
*/
void nodeDetected(DefaultControllerNode node);
DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Notifies about cluster node going offline.
*
* @param node cluster node that vanished
* @param nodeId identifier of the cluster node that vanished
*/
void nodeVanished(DefaultControllerNode node);
void nodeVanished(NodeId nodeId);
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Simple back interface for interacting with the communications service.
*/
public interface CommunicationsDelegate {
/**
* Dispatches the specified message to all registered subscribers.
*
* @param message message to be dispatched
*/
void dispatch(ClusterMessage message);
/**
* Sets the sender.
*
* @param messageSender message sender
*/
void setSender(MessageSender messageSender);
}
......@@ -14,7 +14,6 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -43,20 +42,20 @@ public class DistributedClusterStore
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CommunicationsDelegate commsDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private SerializationService serializationService;
private ClusterCommunicationAdminService communicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
establishSelfIdentity();
connectionManager = new ConnectionManager(localNode, nodesDelegate,
commsDelegate, serializationService);
// Start-up the comm service and prime it with the loaded nodes.
communicationAdminService.startUp(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
communicationAdminService.addNode(node);
}
log.info("Started");
}
......@@ -92,8 +91,8 @@ public class DistributedClusterStore
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
states.put(localNode.id(), State.ACTIVE);
}
states.put(localNode.id(), State.ACTIVE);
}
@Override
......@@ -122,7 +121,7 @@ public class DistributedClusterStore
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
connectionManager.addNode(node);
communicationAdminService.addNode(node);
return node;
}
......@@ -130,21 +129,25 @@ public class DistributedClusterStore
public void removeNode(NodeId nodeId) {
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
connectionManager.removeNode(node);
communicationAdminService.removeNode(node);
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
public void nodeDetected(DefaultControllerNode node) {
nodes.put(node.id(), node);
states.put(node.id(), State.ACTIVE);
public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = nodes.get(nodeId);
if (node == null) {
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
states.put(nodeId, State.ACTIVE);
return node;
}
@Override
public void nodeVanished(DefaultControllerNode node) {
states.put(node.id(), State.INACTIVE);
public void nodeVanished(NodeId nodeId) {
states.put(nodeId, State.INACTIVE);
}
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Created by tom on 9/29/14.
*/
public interface MessageSender {
/**
* Sends the specified message to the given cluster node.
*
* @param nodeId node identifier
* @param message mesage to send
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(NodeId nodeId, ClusterMessage message);
}
package org.onlab.onos.store.cluster.impl;
/**
* Provides means to find a worker IO loop.
*/
public interface WorkerFinder {
/**
* Finds a suitable worker.
*
* @return available worker
*/
ClusterIOWorker findWorker();
}
......@@ -29,9 +29,9 @@ public class HelloMessage extends ClusterMessage {
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
nodeId = nodeId;
ipAddress = ipAddress;
tcpPort = tcpPort;
this.nodeId = nodeId;
this.ipAddress = ipAddress;
this.tcpPort = tcpPort;
}
/**
......@@ -60,4 +60,5 @@ public class HelloMessage extends ClusterMessage {
public int tcpPort() {
return tcpPort;
}
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**
* Represents a message consumer.
*/
......@@ -8,8 +10,9 @@ public interface MessageSubscriber {
/**
* Receives the specified cluster message.
*
* @param message message to be received
* @param message message to be received
* @param fromNodeId node from which the message was received
*/
void receive(ClusterMessage message);
void receive(ClusterMessage message, NodeId fromNodeId);
}
......
......@@ -3,12 +3,12 @@ package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for serializing/deserializing intra-cluster messages.
* Service for encoding &amp; decoding intra-cluster messages.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain a message within.
* Decodes the specified byte buffer to obtain the message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
......
package org.onlab.onos.store.cluster.messaging.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
import org.onlab.onos.store.cluster.impl.MessageSender;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import java.util.Set;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, CommunicationsDelegate {
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
private MessageSender messageSender;
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
return messageSender.send(toNodeId, message);
}
@Override
public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void dispatch(ClusterMessage message) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message);
}
}
}
@Override
public void setSender(MessageSender messageSender) {
this.messageSender = messageSender;
}
}
package org.onlab.onos.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
......@@ -11,8 +18,12 @@ import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
@Component(immediate = true)
@Service
public class MessageSerializer implements SerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
......@@ -46,11 +57,12 @@ public class MessageSerializer implements SerializationService {
buffer.get(data);
// TODO: add deserialization hook here; for now this hack
return null; // actually deserialize
String[] fields = new String(data).split(":");
return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
log.warn("Unable to decode message due to: " + e);
}
return null;
}
......@@ -58,11 +70,18 @@ public class MessageSerializer implements SerializationService {
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
int i = 0;
// Type based lookup for proper encoder
HelloMessage helloMessage = (HelloMessage) message;
buffer.putLong(MARKER);
buffer.putInt(message.subject().ordinal());
String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
byte[] data = str.getBytes();
buffer.putInt(data.length + METADATA_LENGTH);
buffer.put(data);
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
log.warn("Unable to encode message due to: " + e);
}
}
......
......@@ -2,9 +2,9 @@
export ONOS_NIC=192.168.56.*
export OC1="192.168.56.101"
export OC2="192.168.56.102"
export OC1="192.168.56.11"
export OC2="192.168.56.12"
export OCN="192.168.56.105"
export OCN="192.168.56.7"
......