alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 26 changed files with 836 additions and 495 deletions
1 +package org.onlab.onos.store.cluster.impl;
2 +
3 +import org.onlab.onos.cluster.DefaultControllerNode;
4 +
5 +/**
6 + * Service for administering communications manager.
7 + */
8 +public interface ClusterCommunicationAdminService {
9 +
10 + /**
11 + * Adds the node to the list of monitored nodes.
12 + *
13 + * @param node node to be added
14 + */
15 + void addNode(DefaultControllerNode node);
16 +
17 + /**
18 + * Removes the node from the list of monitored nodes.
19 + *
20 + * @param node node to be removed
21 + */
22 + void removeNode(DefaultControllerNode node);
23 +
24 + /**
25 + * Starts-up the communications engine.
26 + *
27 + * @param localNode local controller node
28 + * @param delegate nodes delegate
29 + */
30 + void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
31 +
32 + /**
33 + * Clears all nodes and streams as part of leaving the cluster.
34 + */
35 + void clearAllNodesAndStreams();
36 +}
1 +package org.onlab.onos.store.cluster.impl;
2 +
3 +import com.google.common.collect.HashMultimap;
4 +import com.google.common.collect.ImmutableSet;
5 +import com.google.common.collect.Multimap;
6 +import org.apache.felix.scr.annotations.Activate;
7 +import org.apache.felix.scr.annotations.Component;
8 +import org.apache.felix.scr.annotations.Deactivate;
9 +import org.apache.felix.scr.annotations.Reference;
10 +import org.apache.felix.scr.annotations.ReferenceCardinality;
11 +import org.apache.felix.scr.annotations.Service;
12 +import org.onlab.onos.cluster.DefaultControllerNode;
13 +import org.onlab.onos.cluster.NodeId;
14 +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
16 +import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
17 +import org.onlab.onos.store.cluster.messaging.HelloMessage;
18 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
19 +import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
20 +import org.onlab.onos.store.cluster.messaging.SerializationService;
21 +import org.onlab.packet.IpPrefix;
22 +import org.slf4j.Logger;
23 +import org.slf4j.LoggerFactory;
24 +
25 +import java.io.IOException;
26 +import java.net.InetSocketAddress;
27 +import java.net.SocketAddress;
28 +import java.nio.channels.SocketChannel;
29 +import java.util.ArrayList;
30 +import java.util.HashSet;
31 +import java.util.List;
32 +import java.util.Map;
33 +import java.util.Set;
34 +import java.util.Timer;
35 +import java.util.TimerTask;
36 +import java.util.concurrent.ConcurrentHashMap;
37 +import java.util.concurrent.ExecutorService;
38 +import java.util.concurrent.Executors;
39 +
40 +import static java.net.InetAddress.getByAddress;
41 +import static org.onlab.util.Tools.namedThreads;
42 +
43 +/**
44 + * Implements the cluster communication services to use by other stores.
45 + */
46 +@Component(immediate = true)
47 +@Service
48 +public class ClusterCommunicationManager
49 + implements ClusterCommunicationService, ClusterCommunicationAdminService {
50 +
51 + private final Logger log = LoggerFactory.getLogger(getClass());
52 +
53 + private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
54 + private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
55 +
56 + private static final long START_TIMEOUT = 1000;
57 + private static final int WORKERS = 3;
58 +
59 + private ClusterConnectionListener connectionListener;
60 + private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
61 +
62 + private DefaultControllerNode localNode;
63 + private ClusterNodesDelegate nodesDelegate;
64 +
65 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 + protected SerializationService serializationService;
67 +
68 + // Nodes to be monitored to make sure they have a connection.
69 + private final Set<DefaultControllerNode> nodes = new HashSet<>();
70 +
71 + // Means to track message streams to other nodes.
72 + private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
73 +
74 + // TODO: use something different that won't require synchronization
75 + private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
76 +
77 + // Executor pools for listening and managing connections to other nodes.
78 + private final ExecutorService listenExecutor =
79 + Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
80 + private final ExecutorService commExecutors =
81 + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
82 + private final ExecutorService heartbeatExecutor =
83 + Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
84 +
85 + private final Timer timer = new Timer("onos-comm-initiator");
86 + private final TimerTask connectionCustodian = new ConnectionCustodian();
87 + private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
88 +
89 + @Activate
90 + public void activate() {
91 + addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
92 + log.info("Activated but waiting for delegate");
93 + }
94 +
95 + @Deactivate
96 + public void deactivate() {
97 + connectionCustodian.cancel();
98 + if (connectionListener != null) {
99 + connectionListener.shutdown();
100 + for (ClusterIOWorker worker : workers) {
101 + worker.shutdown();
102 + }
103 + }
104 + log.info("Stopped");
105 + }
106 +
107 + @Override
108 + public boolean send(ClusterMessage message) {
109 + boolean ok = true;
110 + for (DefaultControllerNode node : nodes) {
111 + if (!node.equals(localNode)) {
112 + ok = send(message, node.id()) && ok;
113 + }
114 + }
115 + return ok;
116 + }
117 +
118 + @Override
119 + public boolean send(ClusterMessage message, NodeId toNodeId) {
120 + ClusterMessageStream stream = streams.get(toNodeId);
121 + if (stream != null && !toNodeId.equals(localNode.id())) {
122 + try {
123 + stream.write(message);
124 + return true;
125 + } catch (IOException e) {
126 + log.warn("Unable to send message {} to node {}",
127 + message.subject(), toNodeId);
128 + }
129 + }
130 + return false;
131 + }
132 +
133 + @Override
134 + public synchronized void addSubscriber(MessageSubject subject,
135 + MessageSubscriber subscriber) {
136 + subscribers.put(subject, subscriber);
137 + }
138 +
139 + @Override
140 + public synchronized void removeSubscriber(MessageSubject subject,
141 + MessageSubscriber subscriber) {
142 + subscribers.remove(subject, subscriber);
143 + }
144 +
145 + @Override
146 + public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
147 + return ImmutableSet.copyOf(subscribers.get(subject));
148 + }
149 +
150 + @Override
151 + public void addNode(DefaultControllerNode node) {
152 + nodes.add(node);
153 + }
154 +
155 + @Override
156 + public void removeNode(DefaultControllerNode node) {
157 + send(new GoodbyeMessage(node.id()));
158 + nodes.remove(node);
159 + ClusterMessageStream stream = streams.remove(node.id());
160 + if (stream != null) {
161 + stream.close();
162 + }
163 + }
164 +
165 + @Override
166 + public void startUp(DefaultControllerNode localNode,
167 + ClusterNodesDelegate delegate) {
168 + this.localNode = localNode;
169 + this.nodesDelegate = delegate;
170 +
171 + startCommunications();
172 + startListening();
173 + startInitiatingConnections();
174 + log.info("Started");
175 + }
176 +
177 + @Override
178 + public void clearAllNodesAndStreams() {
179 + nodes.clear();
180 + send(new GoodbyeMessage(localNode.id()));
181 + for (ClusterMessageStream stream : streams.values()) {
182 + stream.close();
183 + }
184 + streams.clear();
185 + }
186 +
187 + /**
188 + * Dispatches the specified message to all subscribers to its subject.
189 + *
190 + * @param message message to dispatch
191 + * @param fromNodeId node from which the message was received
192 + */
193 + void dispatch(ClusterMessage message, NodeId fromNodeId) {
194 + Set<MessageSubscriber> set = getSubscribers(message.subject());
195 + if (set != null) {
196 + for (MessageSubscriber subscriber : set) {
197 + subscriber.receive(message, fromNodeId);
198 + }
199 + }
200 + }
201 +
202 + /**
203 + * Removes the stream associated with the specified node.
204 + *
205 + * @param nodeId newly detected cluster node id
206 + * @param ip node IP listen address
207 + * @param tcpPort node TCP listen port
208 + * @return controller node bound to the stream
209 + */
210 + DefaultControllerNode addNodeStream(NodeId nodeId, IpPrefix ip, int tcpPort,
211 + ClusterMessageStream stream) {
212 + DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
213 + stream.setNode(node);
214 + streams.put(node.id(), stream);
215 + return node;
216 + }
217 +
218 + /**
219 + * Removes the stream associated with the specified node.
220 + *
221 + * @param node node whose stream to remove
222 + */
223 + void removeNodeStream(DefaultControllerNode node) {
224 + nodesDelegate.nodeVanished(node.id());
225 + streams.remove(node.id());
226 + }
227 +
228 + /**
229 + * Finds the least utilized IO worker.
230 + *
231 + * @return IO worker
232 + */
233 + ClusterIOWorker findWorker() {
234 + ClusterIOWorker leastUtilized = null;
235 + int minCount = Integer.MAX_VALUE;
236 + for (ClusterIOWorker worker : workers) {
237 + int count = worker.streamCount();
238 + if (count == 0) {
239 + return worker;
240 + }
241 +
242 + if (count < minCount) {
243 + leastUtilized = worker;
244 + minCount = count;
245 + }
246 + }
247 + return leastUtilized;
248 + }
249 +
250 + /**
251 + * Kicks off the IO loops and waits for them to startup.
252 + */
253 + private void startCommunications() {
254 + HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
255 + localNode.tcpPort());
256 + for (int i = 0; i < WORKERS; i++) {
257 + try {
258 + ClusterIOWorker worker =
259 + new ClusterIOWorker(this, serializationService, hello);
260 + workers.add(worker);
261 + commExecutors.execute(worker);
262 + } catch (IOException e) {
263 + log.warn("Unable to start communication worker", e);
264 + }
265 + }
266 +
267 + // Wait for the IO loops to start
268 + for (ClusterIOWorker loop : workers) {
269 + if (!loop.awaitStart(START_TIMEOUT)) {
270 + log.warn("Comm loop did not start on-time; moving on...");
271 + }
272 + }
273 + }
274 +
275 + /**
276 + * Starts listening for connections from peer cluster members.
277 + */
278 + private void startListening() {
279 + try {
280 + connectionListener =
281 + new ClusterConnectionListener(this, localNode.ip(), localNode.tcpPort());
282 + listenExecutor.execute(connectionListener);
283 + if (!connectionListener.awaitStart(START_TIMEOUT)) {
284 + log.warn("Listener did not start on-time; moving on...");
285 + }
286 + } catch (IOException e) {
287 + log.error("Unable to listen for cluster connections", e);
288 + }
289 + }
290 +
291 + /**
292 + * Attempts to connect to any nodes that do not have an associated connection.
293 + */
294 + private void startInitiatingConnections() {
295 + timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
296 + CONNECTION_CUSTODIAN_FREQUENCY);
297 + }
298 +
299 + /**
300 + * Initiates open connection request and registers the pending socket
301 + * channel with the given IO worker.
302 + *
303 + * @param worker loop with which the channel should be registered
304 + * @throws java.io.IOException if the socket could not be open or connected
305 + */
306 + private void initiateConnection(DefaultControllerNode node,
307 + ClusterIOWorker worker) throws IOException {
308 + SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
309 + SocketChannel ch = SocketChannel.open();
310 + ch.configureBlocking(false);
311 + ch.connect(sa);
312 + worker.connectStream(ch);
313 + }
314 +
315 + // Sweeps through all controller nodes and attempts to open connection to
316 + // those that presently do not have one.
317 + private class ConnectionCustodian extends TimerTask {
318 + @Override
319 + public void run() {
320 + for (DefaultControllerNode node : nodes) {
321 + if (!node.id().equals(localNode.id()) && !streams.containsKey(node.id())) {
322 + try {
323 + initiateConnection(node, findWorker());
324 + } catch (IOException e) {
325 + log.debug("Unable to connect", e);
326 + }
327 + }
328 + }
329 + }
330 + }
331 +
332 + private class GoodbyeSubscriber implements MessageSubscriber {
333 + @Override
334 + public void receive(ClusterMessage message, NodeId fromNodeId) {
335 + log.info("Received goodbye message from {}", fromNodeId);
336 + nodesDelegate.nodeRemoved(fromNodeId);
337 + }
338 + }
339 +}
...@@ -23,12 +23,12 @@ public class ClusterConnectionListener extends AcceptorLoop { ...@@ -23,12 +23,12 @@ public class ClusterConnectionListener extends AcceptorLoop {
23 private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE; 23 private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
24 private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE; 24 private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
25 25
26 - private final WorkerFinder workerFinder; 26 + private final ClusterCommunicationManager manager;
27 27
28 - ClusterConnectionListener(IpPrefix ip, int tcpPort, 28 + ClusterConnectionListener(ClusterCommunicationManager manager,
29 - WorkerFinder workerFinder) throws IOException { 29 + IpPrefix ip, int tcpPort) throws IOException {
30 super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort)); 30 super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
31 - this.workerFinder = workerFinder; 31 + this.manager = manager;
32 } 32 }
33 33
34 @Override 34 @Override
...@@ -41,7 +41,7 @@ public class ClusterConnectionListener extends AcceptorLoop { ...@@ -41,7 +41,7 @@ public class ClusterConnectionListener extends AcceptorLoop {
41 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); 41 so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
42 so.setSendBufferSize(SO_SEND_BUFFER_SIZE); 42 so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
43 43
44 - workerFinder.findWorker().acceptStream(sc); 44 + manager.findWorker().acceptStream(sc);
45 } 45 }
46 46
47 } 47 }
......
...@@ -3,8 +3,9 @@ package org.onlab.onos.store.cluster.impl; ...@@ -3,8 +3,9 @@ package org.onlab.onos.store.cluster.impl;
3 import org.onlab.nio.IOLoop; 3 import org.onlab.nio.IOLoop;
4 import org.onlab.nio.MessageStream; 4 import org.onlab.nio.MessageStream;
5 import org.onlab.onos.cluster.DefaultControllerNode; 5 import org.onlab.onos.cluster.DefaultControllerNode;
6 +import org.onlab.onos.cluster.NodeId;
6 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 7 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
7 -import org.onlab.onos.store.cluster.messaging.ClusterMessageStream; 8 +import org.onlab.onos.store.cluster.messaging.HelloMessage;
8 import org.onlab.onos.store.cluster.messaging.SerializationService; 9 import org.onlab.onos.store.cluster.messaging.SerializationService;
9 import org.slf4j.Logger; 10 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory; 11 import org.slf4j.LoggerFactory;
...@@ -29,27 +30,23 @@ public class ClusterIOWorker extends ...@@ -29,27 +30,23 @@ public class ClusterIOWorker extends
29 30
30 private static final long SELECT_TIMEOUT = 50; 31 private static final long SELECT_TIMEOUT = 50;
31 32
32 - private final ConnectionManager connectionManager; 33 + private final ClusterCommunicationManager manager;
33 - private final CommunicationsDelegate commsDelegate;
34 private final SerializationService serializationService; 34 private final SerializationService serializationService;
35 private final ClusterMessage helloMessage; 35 private final ClusterMessage helloMessage;
36 36
37 /** 37 /**
38 * Creates a new cluster IO worker. 38 * Creates a new cluster IO worker.
39 * 39 *
40 - * @param connectionManager parent connection manager 40 + * @param manager parent comms manager
41 - * @param commsDelegate communications delegate for dispatching
42 * @param serializationService serialization service for encode/decode 41 * @param serializationService serialization service for encode/decode
43 * @param helloMessage hello message for greeting peers 42 * @param helloMessage hello message for greeting peers
44 * @throws IOException if errors occur during IO loop ignition 43 * @throws IOException if errors occur during IO loop ignition
45 */ 44 */
46 - ClusterIOWorker(ConnectionManager connectionManager, 45 + ClusterIOWorker(ClusterCommunicationManager manager,
47 - CommunicationsDelegate commsDelegate,
48 SerializationService serializationService, 46 SerializationService serializationService,
49 ClusterMessage helloMessage) throws IOException { 47 ClusterMessage helloMessage) throws IOException {
50 super(SELECT_TIMEOUT); 48 super(SELECT_TIMEOUT);
51 - this.connectionManager = connectionManager; 49 + this.manager = manager;
52 - this.commsDelegate = commsDelegate;
53 this.serializationService = serializationService; 50 this.serializationService = serializationService;
54 this.helloMessage = helloMessage; 51 this.helloMessage = helloMessage;
55 } 52 }
...@@ -61,11 +58,27 @@ public class ClusterIOWorker extends ...@@ -61,11 +58,27 @@ public class ClusterIOWorker extends
61 58
62 @Override 59 @Override
63 protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) { 60 protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
61 + NodeId nodeId = getNodeId(messages, (ClusterMessageStream) stream);
64 for (ClusterMessage message : messages) { 62 for (ClusterMessage message : messages) {
65 - commsDelegate.dispatch(message); 63 + manager.dispatch(message, nodeId);
66 } 64 }
67 } 65 }
68 66
67 + // Retrieves the node from the stream. If one is not bound, it attempts
68 + // to bind it using the knowledge that the first message must be a hello.
69 + private NodeId getNodeId(List<ClusterMessage> messages, ClusterMessageStream stream) {
70 + DefaultControllerNode node = stream.node();
71 + if (node == null && !messages.isEmpty()) {
72 + ClusterMessage firstMessage = messages.get(0);
73 + if (firstMessage instanceof HelloMessage) {
74 + HelloMessage hello = (HelloMessage) firstMessage;
75 + node = manager.addNodeStream(hello.nodeId(), hello.ipAddress(),
76 + hello.tcpPort(), stream);
77 + }
78 + }
79 + return node != null ? node.id() : null;
80 + }
81 +
69 @Override 82 @Override
70 public ClusterMessageStream acceptStream(SocketChannel channel) { 83 public ClusterMessageStream acceptStream(SocketChannel channel) {
71 ClusterMessageStream stream = super.acceptStream(channel); 84 ClusterMessageStream stream = super.acceptStream(channel);
...@@ -99,7 +112,7 @@ public class ClusterIOWorker extends ...@@ -99,7 +112,7 @@ public class ClusterIOWorker extends
99 DefaultControllerNode node = ((ClusterMessageStream) stream).node(); 112 DefaultControllerNode node = ((ClusterMessageStream) stream).node();
100 if (node != null) { 113 if (node != null) {
101 log.info("Closed connection to node {}", node.id()); 114 log.info("Closed connection to node {}", node.id());
102 - connectionManager.removeNodeStream(node); 115 + manager.removeNodeStream(node);
103 } 116 }
104 super.removeStream(stream); 117 super.removeStream(stream);
105 } 118 }
......
1 -package org.onlab.onos.store.cluster.messaging; 1 +package org.onlab.onos.store.cluster.impl;
2 2
3 import org.onlab.nio.IOLoop; 3 import org.onlab.nio.IOLoop;
4 import org.onlab.nio.MessageStream; 4 import org.onlab.nio.MessageStream;
5 import org.onlab.onos.cluster.DefaultControllerNode; 5 import org.onlab.onos.cluster.DefaultControllerNode;
6 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
7 +import org.onlab.onos.store.cluster.messaging.SerializationService;
6 8
7 import java.nio.ByteBuffer; 9 import java.nio.ByteBuffer;
8 import java.nio.channels.ByteChannel; 10 import java.nio.channels.ByteChannel;
......
1 package org.onlab.onos.store.cluster.impl; 1 package org.onlab.onos.store.cluster.impl;
2 2
3 import org.onlab.onos.cluster.DefaultControllerNode; 3 import org.onlab.onos.cluster.DefaultControllerNode;
4 +import org.onlab.onos.cluster.NodeId;
5 +import org.onlab.packet.IpPrefix;
4 6
5 /** 7 /**
6 * Simple back interface through which connection manager can interact with 8 * Simple back interface through which connection manager can interact with
...@@ -9,17 +11,27 @@ import org.onlab.onos.cluster.DefaultControllerNode; ...@@ -9,17 +11,27 @@ import org.onlab.onos.cluster.DefaultControllerNode;
9 public interface ClusterNodesDelegate { 11 public interface ClusterNodesDelegate {
10 12
11 /** 13 /**
12 - * Notifies about a new cluster node being detected. 14 + * Notifies about cluster node coming online.
13 * 15 *
14 - * @param node newly detected cluster node 16 + * @param nodeId newly detected cluster node id
17 + * @param ip node IP listen address
18 + * @param tcpPort node TCP listen port
19 + * @return the controller node
15 */ 20 */
16 - void nodeDetected(DefaultControllerNode node); 21 + DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort);
17 22
18 /** 23 /**
19 * Notifies about cluster node going offline. 24 * Notifies about cluster node going offline.
20 * 25 *
21 - * @param node cluster node that vanished 26 + * @param nodeId identifier of the cluster node that vanished
22 */ 27 */
23 - void nodeVanished(DefaultControllerNode node); 28 + void nodeVanished(NodeId nodeId);
29 +
30 + /**
31 + * Notifies about remote request to remove node from cluster.
32 + *
33 + * @param nodeId identifier of the cluster node that was removed
34 + */
35 + void nodeRemoved(NodeId nodeId);
24 36
25 } 37 }
......
1 -package org.onlab.onos.store.cluster.impl;
2 -
3 -import org.onlab.onos.store.cluster.messaging.ClusterMessage;
4 -
5 -/**
6 - * Simple back interface for interacting with the communications service.
7 - */
8 -public interface CommunicationsDelegate {
9 -
10 - /**
11 - * Dispatches the specified message to all registered subscribers.
12 - *
13 - * @param message message to be dispatched
14 - */
15 - void dispatch(ClusterMessage message);
16 -
17 - /**
18 - * Sets the sender.
19 - *
20 - * @param messageSender message sender
21 - */
22 - void setSender(MessageSender messageSender);
23 -
24 -}
1 -package org.onlab.onos.store.cluster.impl;
2 -
3 -import org.onlab.onos.cluster.DefaultControllerNode;
4 -import org.onlab.onos.cluster.NodeId;
5 -import org.onlab.onos.store.cluster.messaging.ClusterMessage;
6 -import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
7 -import org.onlab.onos.store.cluster.messaging.HelloMessage;
8 -import org.onlab.onos.store.cluster.messaging.SerializationService;
9 -import org.slf4j.Logger;
10 -import org.slf4j.LoggerFactory;
11 -
12 -import java.io.IOException;
13 -import java.net.InetSocketAddress;
14 -import java.net.SocketAddress;
15 -import java.nio.channels.SocketChannel;
16 -import java.util.ArrayList;
17 -import java.util.HashSet;
18 -import java.util.List;
19 -import java.util.Map;
20 -import java.util.Set;
21 -import java.util.Timer;
22 -import java.util.TimerTask;
23 -import java.util.concurrent.ConcurrentHashMap;
24 -import java.util.concurrent.ExecutorService;
25 -import java.util.concurrent.Executors;
26 -
27 -import static java.net.InetAddress.getByAddress;
28 -import static org.onlab.util.Tools.namedThreads;
29 -
30 -/**
31 - * Manages connections to other controller cluster nodes.
32 - */
33 -public class ConnectionManager implements MessageSender {
34 -
35 - private final Logger log = LoggerFactory.getLogger(getClass());
36 -
37 - private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
38 - private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
39 -
40 - private static final long START_TIMEOUT = 1000;
41 - private static final int WORKERS = 3;
42 -
43 - private ClusterConnectionListener connectionListener;
44 - private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
45 -
46 - private final DefaultControllerNode localNode;
47 - private final ClusterNodesDelegate nodesDelegate;
48 - private final CommunicationsDelegate commsDelegate;
49 - private final SerializationService serializationService;
50 -
51 - // Nodes to be monitored to make sure they have a connection.
52 - private final Set<DefaultControllerNode> nodes = new HashSet<>();
53 -
54 - // Means to track message streams to other nodes.
55 - private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
56 -
57 - // Executor pools for listening and managing connections to other nodes.
58 - private final ExecutorService listenExecutor =
59 - Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
60 - private final ExecutorService commExecutors =
61 - Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
62 - private final ExecutorService heartbeatExecutor =
63 - Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
64 -
65 - private final Timer timer = new Timer("onos-comm-initiator");
66 - private final TimerTask connectionCustodian = new ConnectionCustodian();
67 -
68 - private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
69 -
70 -
71 - /**
72 - * Creates a new connection manager.
73 - */
74 - ConnectionManager(DefaultControllerNode localNode,
75 - ClusterNodesDelegate nodesDelegate,
76 - CommunicationsDelegate commsDelegate,
77 - SerializationService serializationService) {
78 - this.localNode = localNode;
79 - this.nodesDelegate = nodesDelegate;
80 - this.commsDelegate = commsDelegate;
81 - this.serializationService = serializationService;
82 -
83 - commsDelegate.setSender(this);
84 - startCommunications();
85 - startListening();
86 - startInitiating();
87 - log.info("Started");
88 - }
89 -
90 - /**
91 - * Shuts down the connection manager.
92 - */
93 - void shutdown() {
94 - connectionListener.shutdown();
95 - for (ClusterIOWorker worker : workers) {
96 - worker.shutdown();
97 - }
98 - log.info("Stopped");
99 - }
100 -
101 - /**
102 - * Adds the node to the list of monitored nodes.
103 - *
104 - * @param node node to be added
105 - */
106 - void addNode(DefaultControllerNode node) {
107 - nodes.add(node);
108 - }
109 -
110 - /**
111 - * Removes the node from the list of monitored nodes.
112 - *
113 - * @param node node to be removed
114 - */
115 - void removeNode(DefaultControllerNode node) {
116 - nodes.remove(node);
117 - ClusterMessageStream stream = streams.remove(node.id());
118 - if (stream != null) {
119 - stream.close();
120 - }
121 - }
122 -
123 - /**
124 - * Removes the stream associated with the specified node.
125 - *
126 - * @param node node whose stream to remove
127 - */
128 - void removeNodeStream(DefaultControllerNode node) {
129 - nodesDelegate.nodeVanished(node);
130 - streams.remove(node.id());
131 - }
132 -
133 - @Override
134 - public boolean send(NodeId nodeId, ClusterMessage message) {
135 - ClusterMessageStream stream = streams.get(nodeId);
136 - if (stream != null) {
137 - try {
138 - stream.write(message);
139 - return true;
140 - } catch (IOException e) {
141 - log.warn("Unable to send a message about {} to node {}",
142 - message.subject(), nodeId);
143 - }
144 - }
145 - return false;
146 - }
147 -
148 - /**
149 - * Kicks off the IO loops and waits for them to startup.
150 - */
151 - private void startCommunications() {
152 - HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
153 - localNode.tcpPort());
154 - for (int i = 0; i < WORKERS; i++) {
155 - try {
156 - ClusterIOWorker worker =
157 - new ClusterIOWorker(this, commsDelegate,
158 - serializationService, hello);
159 - workers.add(worker);
160 - commExecutors.execute(worker);
161 - } catch (IOException e) {
162 - log.warn("Unable to start communication worker", e);
163 - }
164 - }
165 -
166 - // Wait for the IO loops to start
167 - for (ClusterIOWorker loop : workers) {
168 - if (!loop.awaitStart(START_TIMEOUT)) {
169 - log.warn("Comm loop did not start on-time; moving on...");
170 - }
171 - }
172 - }
173 -
174 - /**
175 - * Starts listening for connections from peer cluster members.
176 - */
177 - private void startListening() {
178 - try {
179 - connectionListener =
180 - new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
181 - workerFinder);
182 - listenExecutor.execute(connectionListener);
183 - if (!connectionListener.awaitStart(START_TIMEOUT)) {
184 - log.warn("Listener did not start on-time; moving on...");
185 - }
186 - } catch (IOException e) {
187 - log.error("Unable to listen for cluster connections", e);
188 - }
189 - }
190 -
191 - /**
192 - * Initiates open connection request and registers the pending socket
193 - * channel with the given IO loop.
194 - *
195 - * @param loop loop with which the channel should be registered
196 - * @throws java.io.IOException if the socket could not be open or connected
197 - */
198 - private void initiateConnection(DefaultControllerNode node,
199 - ClusterIOWorker loop) throws IOException {
200 - SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
201 - SocketChannel ch = SocketChannel.open();
202 - ch.configureBlocking(false);
203 - ch.connect(sa);
204 - loop.connectStream(ch);
205 - }
206 -
207 -
208 - /**
209 - * Attempts to connect to any nodes that do not have an associated connection.
210 - */
211 - private void startInitiating() {
212 - timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
213 - CONNECTION_CUSTODIAN_FREQUENCY);
214 - }
215 -
216 - // Sweeps through all controller nodes and attempts to open connection to
217 - // those that presently do not have one.
218 - private class ConnectionCustodian extends TimerTask {
219 - @Override
220 - public void run() {
221 - for (DefaultControllerNode node : nodes) {
222 - if (node != localNode && !streams.containsKey(node.id())) {
223 - try {
224 - initiateConnection(node, workerFinder.findWorker());
225 - } catch (IOException e) {
226 - log.debug("Unable to connect", e);
227 - }
228 - }
229 - }
230 - }
231 - }
232 -
233 - // Finds the least utilitied IO loop.
234 - private class LeastUtilitiedWorkerFinder implements WorkerFinder {
235 -
236 - @Override
237 - public ClusterIOWorker findWorker() {
238 - ClusterIOWorker leastUtilized = null;
239 - int minCount = Integer.MAX_VALUE;
240 - for (ClusterIOWorker worker : workers) {
241 - int count = worker.streamCount();
242 - if (count == 0) {
243 - return worker;
244 - }
245 -
246 - if (count < minCount) {
247 - leastUtilized = worker;
248 - minCount = count;
249 - }
250 - }
251 - return leastUtilized;
252 - }
253 - }
254 -
255 -}
...@@ -14,7 +14,6 @@ import org.onlab.onos.cluster.ControllerNode; ...@@ -14,7 +14,6 @@ import org.onlab.onos.cluster.ControllerNode;
14 import org.onlab.onos.cluster.DefaultControllerNode; 14 import org.onlab.onos.cluster.DefaultControllerNode;
15 import org.onlab.onos.cluster.NodeId; 15 import org.onlab.onos.cluster.NodeId;
16 import org.onlab.onos.store.AbstractStore; 16 import org.onlab.onos.store.AbstractStore;
17 -import org.onlab.onos.store.cluster.messaging.SerializationService;
18 import org.onlab.packet.IpPrefix; 17 import org.onlab.packet.IpPrefix;
19 import org.slf4j.Logger; 18 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory; 19 import org.slf4j.LoggerFactory;
...@@ -43,20 +42,20 @@ public class DistributedClusterStore ...@@ -43,20 +42,20 @@ public class DistributedClusterStore
43 private final Map<NodeId, State> states = new ConcurrentHashMap<>(); 42 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
44 43
45 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 44 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 - private CommunicationsDelegate commsDelegate; 45 + private ClusterCommunicationAdminService communicationAdminService;
47 -
48 - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
49 - private SerializationService serializationService;
50 46
51 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate(); 47 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
52 - private ConnectionManager connectionManager;
53 48
54 @Activate 49 @Activate
55 public void activate() { 50 public void activate() {
56 loadClusterDefinition(); 51 loadClusterDefinition();
57 establishSelfIdentity(); 52 establishSelfIdentity();
58 - connectionManager = new ConnectionManager(localNode, nodesDelegate, 53 +
59 - commsDelegate, serializationService); 54 + // Start-up the comm service and prime it with the loaded nodes.
55 + communicationAdminService.startUp(localNode, nodesDelegate);
56 + for (DefaultControllerNode node : nodes.values()) {
57 + communicationAdminService.addNode(node);
58 + }
60 log.info("Started"); 59 log.info("Started");
61 } 60 }
62 61
...@@ -92,8 +91,8 @@ public class DistributedClusterStore ...@@ -92,8 +91,8 @@ public class DistributedClusterStore
92 if (localNode == null) { 91 if (localNode == null) {
93 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip); 92 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
94 nodes.put(localNode.id(), localNode); 93 nodes.put(localNode.id(), localNode);
95 - states.put(localNode.id(), State.ACTIVE);
96 } 94 }
95 + states.put(localNode.id(), State.ACTIVE);
97 } 96 }
98 97
99 @Override 98 @Override
...@@ -122,29 +121,46 @@ public class DistributedClusterStore ...@@ -122,29 +121,46 @@ public class DistributedClusterStore
122 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { 121 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
123 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort); 122 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
124 nodes.put(nodeId, node); 123 nodes.put(nodeId, node);
125 - connectionManager.addNode(node); 124 + communicationAdminService.addNode(node);
126 return node; 125 return node;
127 } 126 }
128 127
129 @Override 128 @Override
130 public void removeNode(NodeId nodeId) { 129 public void removeNode(NodeId nodeId) {
131 - DefaultControllerNode node = nodes.remove(nodeId); 130 + if (nodeId.equals(localNode.id())) {
132 - if (node != null) { 131 + // FIXME: this is still broken
133 - connectionManager.removeNode(node); 132 + // We are being ejected from the cluster, so remove all other nodes.
133 + communicationAdminService.clearAllNodesAndStreams();
134 + nodes.clear();
135 + } else {
136 + // Remove the other node.
137 + DefaultControllerNode node = nodes.remove(nodeId);
138 + if (node != null) {
139 + communicationAdminService.removeNode(node);
140 + }
134 } 141 }
135 } 142 }
136 143
137 // Entity to handle back calls from the connection manager. 144 // Entity to handle back calls from the connection manager.
138 private class InnerNodesDelegate implements ClusterNodesDelegate { 145 private class InnerNodesDelegate implements ClusterNodesDelegate {
139 @Override 146 @Override
140 - public void nodeDetected(DefaultControllerNode node) { 147 + public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
141 - nodes.put(node.id(), node); 148 + DefaultControllerNode node = nodes.get(nodeId);
142 - states.put(node.id(), State.ACTIVE); 149 + if (node == null) {
150 + node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
151 + }
152 + states.put(nodeId, State.ACTIVE);
153 + return node;
154 + }
155 + @Override
156 + public void nodeVanished(NodeId nodeId) {
157 + states.put(nodeId, State.INACTIVE);
143 } 158 }
144 159
145 @Override 160 @Override
146 - public void nodeVanished(DefaultControllerNode node) { 161 + public void nodeRemoved(NodeId nodeId) {
147 - states.put(node.id(), State.INACTIVE); 162 + removeNode(nodeId);
148 } 163 }
149 } 164 }
165 +
150 } 166 }
......
1 -package org.onlab.onos.store.cluster.impl;
2 -
3 -import org.onlab.onos.cluster.NodeId;
4 -import org.onlab.onos.store.cluster.messaging.ClusterMessage;
5 -
6 -/**
7 - * Created by tom on 9/29/14.
8 - */
9 -public interface MessageSender {
10 -
11 - /**
12 - * Sends the specified message to the given cluster node.
13 - *
14 - * @param nodeId node identifier
15 - * @param message mesage to send
16 - * @return true if the message was sent sucessfully; false if there is
17 - * no stream or if there was an error
18 - */
19 - boolean send(NodeId nodeId, ClusterMessage message);
20 -
21 -}
1 +package org.onlab.onos.store.cluster.impl;
2 +
3 +import de.javakaffee.kryoserializers.URISerializer;
4 +import org.apache.felix.scr.annotations.Activate;
5 +import org.apache.felix.scr.annotations.Component;
6 +import org.apache.felix.scr.annotations.Deactivate;
7 +import org.apache.felix.scr.annotations.Service;
8 +import org.onlab.onos.cluster.ControllerNode;
9 +import org.onlab.onos.cluster.DefaultControllerNode;
10 +import org.onlab.onos.cluster.NodeId;
11 +import org.onlab.onos.net.ConnectPoint;
12 +import org.onlab.onos.net.DefaultDevice;
13 +import org.onlab.onos.net.DefaultLink;
14 +import org.onlab.onos.net.DefaultPort;
15 +import org.onlab.onos.net.Device;
16 +import org.onlab.onos.net.DeviceId;
17 +import org.onlab.onos.net.Element;
18 +import org.onlab.onos.net.Link;
19 +import org.onlab.onos.net.LinkKey;
20 +import org.onlab.onos.net.MastershipRole;
21 +import org.onlab.onos.net.Port;
22 +import org.onlab.onos.net.PortNumber;
23 +import org.onlab.onos.net.provider.ProviderId;
24 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
25 +import org.onlab.onos.store.cluster.messaging.EchoMessage;
26 +import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
27 +import org.onlab.onos.store.cluster.messaging.HelloMessage;
28 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
29 +import org.onlab.onos.store.cluster.messaging.SerializationService;
30 +import org.onlab.onos.store.serializers.ConnectPointSerializer;
31 +import org.onlab.onos.store.serializers.DefaultLinkSerializer;
32 +import org.onlab.onos.store.serializers.DefaultPortSerializer;
33 +import org.onlab.onos.store.serializers.DeviceIdSerializer;
34 +import org.onlab.onos.store.serializers.IpPrefixSerializer;
35 +import org.onlab.onos.store.serializers.LinkKeySerializer;
36 +import org.onlab.onos.store.serializers.NodeIdSerializer;
37 +import org.onlab.onos.store.serializers.PortNumberSerializer;
38 +import org.onlab.onos.store.serializers.ProviderIdSerializer;
39 +import org.onlab.packet.IpPrefix;
40 +import org.onlab.util.KryoPool;
41 +import org.slf4j.Logger;
42 +import org.slf4j.LoggerFactory;
43 +
44 +import java.net.URI;
45 +import java.nio.ByteBuffer;
46 +import java.util.ArrayList;
47 +import java.util.HashMap;
48 +
49 +import static com.google.common.base.Preconditions.checkState;
50 +
51 +/**
52 + * Factory for parsing messages sent between cluster members.
53 + */
54 +@Component(immediate = true)
55 +@Service
56 +public class MessageSerializer implements SerializationService {
57 +
58 + private final Logger log = LoggerFactory.getLogger(getClass());
59 +
60 + private static final int METADATA_LENGTH = 12; // 8 + 4
61 + private static final int LENGTH_OFFSET = 8;
62 +
63 + private static final long MARKER = 0xfeedcafebeaddeadL;
64 +
65 + private KryoPool serializerPool;
66 +
67 + @Activate
68 + public void activate() {
69 + setupKryoPool();
70 + log.info("Started");
71 + }
72 +
73 + @Deactivate
74 + public void deactivate() {
75 + log.info("Stopped");
76 + }
77 +
78 + /**
79 + * Sets up the common serialzers pool.
80 + */
81 + protected void setupKryoPool() {
82 + // FIXME Slice out types used in common to separate pool/namespace.
83 + serializerPool = KryoPool.newBuilder()
84 + .register(ArrayList.class,
85 + HashMap.class,
86 +
87 + ControllerNode.State.class,
88 + Device.Type.class,
89 +
90 + DefaultControllerNode.class,
91 + DefaultDevice.class,
92 + MastershipRole.class,
93 + Port.class,
94 + Element.class,
95 +
96 + Link.Type.class,
97 +
98 + MessageSubject.class,
99 + HelloMessage.class,
100 + GoodbyeMessage.class,
101 + EchoMessage.class
102 + )
103 + .register(IpPrefix.class, new IpPrefixSerializer())
104 + .register(URI.class, new URISerializer())
105 + .register(NodeId.class, new NodeIdSerializer())
106 + .register(ProviderId.class, new ProviderIdSerializer())
107 + .register(DeviceId.class, new DeviceIdSerializer())
108 + .register(PortNumber.class, new PortNumberSerializer())
109 + .register(DefaultPort.class, new DefaultPortSerializer())
110 + .register(LinkKey.class, new LinkKeySerializer())
111 + .register(ConnectPoint.class, new ConnectPointSerializer())
112 + .register(DefaultLink.class, new DefaultLinkSerializer())
113 + .build()
114 + .populate(1);
115 + }
116 +
117 +
118 + @Override
119 + public ClusterMessage decode(ByteBuffer buffer) {
120 + try {
121 + // Do we have enough bytes to read the header? If not, bail.
122 + if (buffer.remaining() < METADATA_LENGTH) {
123 + return null;
124 + }
125 +
126 + // Peek at the length and if we have enough to read the entire message
127 + // go ahead, otherwise bail.
128 + int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
129 + if (buffer.remaining() < length) {
130 + return null;
131 + }
132 +
133 + // At this point, we have enough data to read a complete message.
134 + long marker = buffer.getLong();
135 + checkState(marker == MARKER, "Incorrect message marker");
136 + length = buffer.getInt();
137 +
138 + // TODO: sanity checking for length
139 + byte[] data = new byte[length - METADATA_LENGTH];
140 + buffer.get(data);
141 + return (ClusterMessage) serializerPool.deserialize(data);
142 +
143 + } catch (Exception e) {
144 + // TODO: recover from exceptions by forwarding stream to next marker
145 + log.warn("Unable to decode message due to: " + e);
146 + }
147 + return null;
148 + }
149 +
150 + @Override
151 + public void encode(ClusterMessage message, ByteBuffer buffer) {
152 + try {
153 + byte[] data = serializerPool.serialize(message);
154 + buffer.putLong(MARKER);
155 + buffer.putInt(data.length + METADATA_LENGTH);
156 + buffer.put(data);
157 +
158 + } catch (Exception e) {
159 + // TODO: recover from exceptions by forwarding stream to next marker
160 + log.warn("Unable to encode message due to: " + e);
161 + }
162 + }
163 +
164 +}
1 -package org.onlab.onos.store.cluster.impl;
2 -
3 -/**
4 - * Provides means to find a worker IO loop.
5 - */
6 -public interface WorkerFinder {
7 -
8 - /**
9 - * Finds a suitable worker.
10 - *
11 - * @return available worker
12 - */
13 - ClusterIOWorker findWorker();
14 -}
1 +/**
2 + * Distributed cluster store and messaging subsystem implementation.
3 + */
4 +package org.onlab.onos.store.cluster.impl;
...\ No newline at end of file ...\ No newline at end of file
...@@ -10,6 +10,15 @@ import java.util.Set; ...@@ -10,6 +10,15 @@ import java.util.Set;
10 public interface ClusterCommunicationService { 10 public interface ClusterCommunicationService {
11 11
12 /** 12 /**
13 + * Sends a message to all controller nodes.
14 + *
15 + * @param message message to send
16 + * @return true if the message was sent sucessfully to all nodes; false
17 + * if there is no stream or if there was an error for some node
18 + */
19 + boolean send(ClusterMessage message);
20 +
21 + /**
13 * Sends a message to the specified controller node. 22 * Sends a message to the specified controller node.
14 * 23 *
15 * @param message message to send 24 * @param message message to send
......
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import org.onlab.onos.cluster.NodeId;
4 +
5 +/**
6 + * Goodbye message that nodes use to leave the cluster for good.
7 + */
8 +public class GoodbyeMessage extends ClusterMessage {
9 +
10 + private NodeId nodeId;
11 +
12 + // For serialization
13 + private GoodbyeMessage() {
14 + super(MessageSubject.GOODBYE);
15 + nodeId = null;
16 + }
17 +
18 + /**
19 + * Creates a new goodbye message.
20 + *
21 + * @param nodeId sending node identification
22 + */
23 + public GoodbyeMessage(NodeId nodeId) {
24 + super(MessageSubject.HELLO);
25 + this.nodeId = nodeId;
26 + }
27 +
28 + /**
29 + * Returns the sending node identifer.
30 + *
31 + * @return node identifier
32 + */
33 + public NodeId nodeId() {
34 + return nodeId;
35 + }
36 +
37 +}
...@@ -29,9 +29,9 @@ public class HelloMessage extends ClusterMessage { ...@@ -29,9 +29,9 @@ public class HelloMessage extends ClusterMessage {
29 */ 29 */
30 public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) { 30 public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
31 super(MessageSubject.HELLO); 31 super(MessageSubject.HELLO);
32 - nodeId = nodeId; 32 + this.nodeId = nodeId;
33 - ipAddress = ipAddress; 33 + this.ipAddress = ipAddress;
34 - tcpPort = tcpPort; 34 + this.tcpPort = tcpPort;
35 } 35 }
36 36
37 /** 37 /**
...@@ -60,4 +60,5 @@ public class HelloMessage extends ClusterMessage { ...@@ -60,4 +60,5 @@ public class HelloMessage extends ClusterMessage {
60 public int tcpPort() { 60 public int tcpPort() {
61 return tcpPort; 61 return tcpPort;
62 } 62 }
63 +
63 } 64 }
......
...@@ -8,6 +8,9 @@ public enum MessageSubject { ...@@ -8,6 +8,9 @@ public enum MessageSubject {
8 /** Represents a first greeting message. */ 8 /** Represents a first greeting message. */
9 HELLO, 9 HELLO,
10 10
11 + /** Signifies node's intent to leave the cluster. */
12 + GOODBYE,
13 +
11 /** Signifies a heart-beat message. */ 14 /** Signifies a heart-beat message. */
12 ECHO 15 ECHO
13 16
......
1 package org.onlab.onos.store.cluster.messaging; 1 package org.onlab.onos.store.cluster.messaging;
2 2
3 +import org.onlab.onos.cluster.NodeId;
4 +
3 /** 5 /**
4 * Represents a message consumer. 6 * Represents a message consumer.
5 */ 7 */
...@@ -8,8 +10,9 @@ public interface MessageSubscriber { ...@@ -8,8 +10,9 @@ public interface MessageSubscriber {
8 /** 10 /**
9 * Receives the specified cluster message. 11 * Receives the specified cluster message.
10 * 12 *
11 - * @param message message to be received 13 + * @param message message to be received
14 + * @param fromNodeId node from which the message was received
12 */ 15 */
13 - void receive(ClusterMessage message); 16 + void receive(ClusterMessage message, NodeId fromNodeId);
14 17
15 } 18 }
......
...@@ -3,12 +3,12 @@ package org.onlab.onos.store.cluster.messaging; ...@@ -3,12 +3,12 @@ package org.onlab.onos.store.cluster.messaging;
3 import java.nio.ByteBuffer; 3 import java.nio.ByteBuffer;
4 4
5 /** 5 /**
6 - * Service for serializing/deserializing intra-cluster messages. 6 + * Service for encoding &amp; decoding intra-cluster messages.
7 */ 7 */
8 public interface SerializationService { 8 public interface SerializationService {
9 9
10 /** 10 /**
11 - * Decodes the specified byte buffer to obtain a message within. 11 + * Decodes the specified byte buffer to obtain the message within.
12 * 12 *
13 * @param buffer byte buffer with message(s) 13 * @param buffer byte buffer with message(s)
14 * @return parsed message 14 * @return parsed message
......
1 -package org.onlab.onos.store.cluster.messaging.impl;
2 -
3 -import com.google.common.collect.HashMultimap;
4 -import com.google.common.collect.ImmutableSet;
5 -import com.google.common.collect.Multimap;
6 -import org.apache.felix.scr.annotations.Component;
7 -import org.apache.felix.scr.annotations.Service;
8 -import org.onlab.onos.cluster.NodeId;
9 -import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
10 -import org.onlab.onos.store.cluster.impl.MessageSender;
11 -import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
12 -import org.onlab.onos.store.cluster.messaging.ClusterMessage;
13 -import org.onlab.onos.store.cluster.messaging.MessageSubject;
14 -import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
15 -
16 -import java.util.Set;
17 -
18 -/**
19 - * Implements the cluster communication services to use by other stores.
20 - */
21 -@Component(immediate = true)
22 -@Service
23 -public class ClusterCommunicationManager
24 - implements ClusterCommunicationService, CommunicationsDelegate {
25 -
26 - // TODO: use something different that won't require synchronization
27 - private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
28 - private MessageSender messageSender;
29 -
30 - @Override
31 - public boolean send(ClusterMessage message, NodeId toNodeId) {
32 - return messageSender.send(toNodeId, message);
33 - }
34 -
35 - @Override
36 - public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
37 - subscribers.put(subject, subscriber);
38 - }
39 -
40 - @Override
41 - public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
42 - subscribers.remove(subject, subscriber);
43 - }
44 -
45 - @Override
46 - public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
47 - return ImmutableSet.copyOf(subscribers.get(subject));
48 - }
49 -
50 - @Override
51 - public void dispatch(ClusterMessage message) {
52 - Set<MessageSubscriber> set = getSubscribers(message.subject());
53 - if (set != null) {
54 - for (MessageSubscriber subscriber : set) {
55 - subscriber.receive(message);
56 - }
57 - }
58 - }
59 -
60 - @Override
61 - public void setSender(MessageSender messageSender) {
62 - this.messageSender = messageSender;
63 - }
64 -}
1 -package org.onlab.onos.store.cluster.messaging.impl;
2 -
3 -import org.onlab.onos.store.cluster.messaging.ClusterMessage;
4 -import org.onlab.onos.store.cluster.messaging.MessageSubject;
5 -import org.onlab.onos.store.cluster.messaging.SerializationService;
6 -
7 -import java.nio.ByteBuffer;
8 -
9 -import static com.google.common.base.Preconditions.checkState;
10 -
11 -/**
12 - * Factory for parsing messages sent between cluster members.
13 - */
14 -public class MessageSerializer implements SerializationService {
15 -
16 - private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
17 - private static final int LENGTH_OFFSET = 12;
18 -
19 - private static final long MARKER = 0xfeedcafebeaddeadL;
20 -
21 - @Override
22 - public ClusterMessage decode(ByteBuffer buffer) {
23 - try {
24 - // Do we have enough bytes to read the header? If not, bail.
25 - if (buffer.remaining() < METADATA_LENGTH) {
26 - return null;
27 - }
28 -
29 - // Peek at the length and if we have enough to read the entire message
30 - // go ahead, otherwise bail.
31 - int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
32 - if (buffer.remaining() < length) {
33 - return null;
34 - }
35 -
36 - // At this point, we have enough data to read a complete message.
37 - long marker = buffer.getLong();
38 - checkState(marker == MARKER, "Incorrect message marker");
39 -
40 - int subjectOrdinal = buffer.getInt();
41 - MessageSubject subject = MessageSubject.values()[subjectOrdinal];
42 - length = buffer.getInt();
43 -
44 - // TODO: sanity checking for length
45 - byte[] data = new byte[length - METADATA_LENGTH];
46 - buffer.get(data);
47 -
48 - // TODO: add deserialization hook here; for now this hack
49 - return null; // actually deserialize
50 -
51 - } catch (Exception e) {
52 - // TODO: recover from exceptions by forwarding stream to next marker
53 - e.printStackTrace();
54 - }
55 - return null;
56 - }
57 -
58 - @Override
59 - public void encode(ClusterMessage message, ByteBuffer buffer) {
60 - try {
61 - int i = 0;
62 - // Type based lookup for proper encoder
63 - } catch (Exception e) {
64 - // TODO: recover from exceptions by forwarding stream to next marker
65 - e.printStackTrace();
66 - }
67 - }
68 -
69 -}
1 +/**
2 + * Cluster messaging APIs for the use by the various distributed stores.
3 + */
4 +package org.onlab.onos.store.cluster.messaging;
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onlab.onos.store.cluster.impl;
2 +
3 +import org.junit.After;
4 +import org.junit.Before;
5 +import org.junit.Test;
6 +import org.onlab.onos.cluster.DefaultControllerNode;
7 +import org.onlab.onos.cluster.NodeId;
8 +import org.onlab.packet.IpPrefix;
9 +
10 +import java.util.concurrent.CountDownLatch;
11 +import java.util.concurrent.TimeUnit;
12 +
13 +import static org.junit.Assert.assertEquals;
14 +import static org.junit.Assert.assertTrue;
15 +
16 +/**
17 + * Tests of the cluster communication manager.
18 + */
19 +public class ClusterCommunicationManagerTest {
20 +
21 + private static final NodeId N1 = new NodeId("n1");
22 + private static final NodeId N2 = new NodeId("n2");
23 +
24 + private static final int P1 = 9881;
25 + private static final int P2 = 9882;
26 +
27 + private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
28 +
29 + private ClusterCommunicationManager ccm1;
30 + private ClusterCommunicationManager ccm2;
31 +
32 + private TestDelegate cnd1 = new TestDelegate();
33 + private TestDelegate cnd2 = new TestDelegate();
34 +
35 + private DefaultControllerNode node1 = new DefaultControllerNode(N1, IP, P1);
36 + private DefaultControllerNode node2 = new DefaultControllerNode(N2, IP, P2);
37 +
38 + @Before
39 + public void setUp() {
40 + MessageSerializer messageSerializer = new MessageSerializer();
41 + messageSerializer.activate();
42 +
43 + ccm1 = new ClusterCommunicationManager();
44 + ccm1.serializationService = messageSerializer;
45 + ccm1.activate();
46 +
47 + ccm2 = new ClusterCommunicationManager();
48 + ccm2.serializationService = messageSerializer;
49 + ccm2.activate();
50 +
51 + ccm1.startUp(node1, cnd1);
52 + ccm2.startUp(node2, cnd2);
53 + }
54 +
55 + @After
56 + public void tearDown() {
57 + ccm1.deactivate();
58 + ccm2.deactivate();
59 + }
60 +
61 + @Test
62 + public void connect() throws Exception {
63 + cnd1.latch = new CountDownLatch(1);
64 + cnd2.latch = new CountDownLatch(1);
65 +
66 + ccm1.addNode(node2);
67 + validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
68 + validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
69 + }
70 +
71 + @Test
72 + public void disconnect() throws Exception {
73 + cnd1.latch = new CountDownLatch(1);
74 + cnd2.latch = new CountDownLatch(1);
75 +
76 + ccm1.addNode(node2);
77 + validateDelegateEvent(cnd1, Op.DETECTED, node2.id());
78 + validateDelegateEvent(cnd2, Op.DETECTED, node1.id());
79 +
80 + cnd1.latch = new CountDownLatch(1);
81 + cnd2.latch = new CountDownLatch(1);
82 + ccm1.deactivate();
83 +//
84 +// validateDelegateEvent(cnd2, Op.VANISHED, node1.id());
85 + }
86 +
87 + private void validateDelegateEvent(TestDelegate delegate, Op op, NodeId nodeId)
88 + throws InterruptedException {
89 + assertTrue("did not connect in time", delegate.latch.await(2500, TimeUnit.MILLISECONDS));
90 + assertEquals("incorrect event", op, delegate.op);
91 + assertEquals("incorrect event node", nodeId, delegate.nodeId);
92 + }
93 +
94 + enum Op { DETECTED, VANISHED, REMOVED };
95 +
96 + private class TestDelegate implements ClusterNodesDelegate {
97 +
98 + Op op;
99 + CountDownLatch latch;
100 + NodeId nodeId;
101 +
102 + @Override
103 + public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
104 + latch(nodeId, Op.DETECTED);
105 + return new DefaultControllerNode(nodeId, ip, tcpPort);
106 + }
107 +
108 + @Override
109 + public void nodeVanished(NodeId nodeId) {
110 + latch(nodeId, Op.VANISHED);
111 + }
112 +
113 + @Override
114 + public void nodeRemoved(NodeId nodeId) {
115 + latch(nodeId, Op.REMOVED);
116 + }
117 +
118 + private void latch(NodeId nodeId, Op op) {
119 + this.op = op;
120 + this.nodeId = nodeId;
121 + latch.countDown();
122 + }
123 + }
124 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -3,5 +3,7 @@ ...@@ -3,5 +3,7 @@
3 # ONOS remote command-line client. 3 # ONOS remote command-line client.
4 #------------------------------------------------------------------------------- 4 #-------------------------------------------------------------------------------
5 5
6 +[ "$1" = "-w" ] && shift && onos-wait-for-start $1
7 +
6 [ -n "$1" ] && OCI=$1 && shift 8 [ -n "$1" ] && OCI=$1 && shift
7 -client -h $OCI "$@" 9 +client -h $OCI "$@" 2>/dev/null
......
1 +#!/bin/bash
2 +#-------------------------------------------------------------------------------
3 +# Remotely kills the ONOS service on the specified node.
4 +#-------------------------------------------------------------------------------
5 +
6 +[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
7 +. $ONOS_ROOT/tools/build/envDefaults
8 +
9 +ssh $ONOS_USER@${1:-$OCI} "kill -9 \$(ps -ef | grep karaf.jar | grep -v grep | cut -c10-15)"
...\ No newline at end of file ...\ No newline at end of file
1 +# Default virtual box ONOS instances 1,2 & ONOS mininet box
2 +
3 +export ONOS_NIC=192.168.56.*
4 +
5 +export OC1="192.168.56.11"
6 +export OC2="192.168.56.12"
7 +
8 +export OCN="192.168.56.7"
9 +
10 +