Madan Jampani
Committed by Gerrit Code Review

Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
Showing 24 changed files with 727 additions and 576 deletions
...@@ -219,7 +219,7 @@ public class IntentPerfCollector { ...@@ -219,7 +219,7 @@ public class IntentPerfCollector {
219 219
220 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) { 220 private void broadcastSample(long time, NodeId nodeId, double overallRate, double currentRate) {
221 String data = String.format("%d|%f|%f", time, overallRate, currentRate); 221 String data = String.format("%d|%f|%f", time, overallRate, currentRate);
222 - communicationService.broadcast(new ClusterMessage(nodeId, SAMPLE, data.getBytes())); 222 + communicationService.broadcast(data, SAMPLE, str -> str.getBytes());
223 } 223 }
224 224
225 private class InternalSampleCollector implements ClusterMessageHandler { 225 private class InternalSampleCollector implements ClusterMessageHandler {
......
...@@ -249,14 +249,14 @@ public class IntentPerfInstaller { ...@@ -249,14 +249,14 @@ public class IntentPerfInstaller {
249 public void start() { 249 public void start() {
250 if (stopped) { 250 if (stopped) {
251 stopped = false; 251 stopped = false;
252 - communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes())); 252 + communicationService.broadcast(START, CONTROL, str -> str.getBytes());
253 startTestRun(); 253 startTestRun();
254 } 254 }
255 } 255 }
256 256
257 public void stop() { 257 public void stop() {
258 if (!stopped) { 258 if (!stopped) {
259 - communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes())); 259 + communicationService.broadcast(STOP, CONTROL, str -> str.getBytes());
260 stopTestRun(); 260 stopTestRun();
261 } 261 }
262 } 262 }
......
...@@ -15,13 +15,16 @@ ...@@ -15,13 +15,16 @@
15 */ 15 */
16 package org.onosproject.store.cluster.messaging; 16 package org.onosproject.store.cluster.messaging;
17 17
18 -import com.google.common.util.concurrent.ListenableFuture; 18 +import java.util.Set;
19 +import java.util.concurrent.CompletableFuture;
20 +import java.util.concurrent.ExecutorService;
21 +import java.util.function.Consumer;
22 +import java.util.function.Function;
23 +
19 import org.onosproject.cluster.NodeId; 24 import org.onosproject.cluster.NodeId;
20 25
21 -import java.io.IOException; 26 +import com.google.common.util.concurrent.ListenableFuture;
22 -import java.util.concurrent.ExecutorService;
23 27
24 -// TODO: remove IOExceptions?
25 /** 28 /**
26 * Service for assisting communications between controller cluster nodes. 29 * Service for assisting communications between controller cluster nodes.
27 */ 30 */
...@@ -33,6 +36,7 @@ public interface ClusterCommunicationService { ...@@ -33,6 +36,7 @@ public interface ClusterCommunicationService {
33 * @param message message to send 36 * @param message message to send
34 * @return true if the message was sent successfully to all nodes; false otherwise. 37 * @return true if the message was sent successfully to all nodes; false otherwise.
35 */ 38 */
39 + @Deprecated
36 boolean broadcast(ClusterMessage message); 40 boolean broadcast(ClusterMessage message);
37 41
38 /** 42 /**
...@@ -41,6 +45,7 @@ public interface ClusterCommunicationService { ...@@ -41,6 +45,7 @@ public interface ClusterCommunicationService {
41 * @param message message to send 45 * @param message message to send
42 * @return true if the message was sent successfully to all nodes; false otherwise. 46 * @return true if the message was sent successfully to all nodes; false otherwise.
43 */ 47 */
48 + @Deprecated
44 boolean broadcastIncludeSelf(ClusterMessage message); 49 boolean broadcastIncludeSelf(ClusterMessage message);
45 50
46 /** 51 /**
...@@ -50,6 +55,7 @@ public interface ClusterCommunicationService { ...@@ -50,6 +55,7 @@ public interface ClusterCommunicationService {
50 * @param toNodeId node identifier 55 * @param toNodeId node identifier
51 * @return true if the message was sent successfully; false otherwise. 56 * @return true if the message was sent successfully; false otherwise.
52 */ 57 */
58 + @Deprecated
53 boolean unicast(ClusterMessage message, NodeId toNodeId); 59 boolean unicast(ClusterMessage message, NodeId toNodeId);
54 60
55 /** 61 /**
...@@ -59,6 +65,7 @@ public interface ClusterCommunicationService { ...@@ -59,6 +65,7 @@ public interface ClusterCommunicationService {
59 * @param nodeIds recipient node identifiers 65 * @param nodeIds recipient node identifiers
60 * @return true if the message was sent successfully to all nodes in the group; false otherwise. 66 * @return true if the message was sent successfully to all nodes in the group; false otherwise.
61 */ 67 */
68 + @Deprecated
62 boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds); 69 boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
63 70
64 /** 71 /**
...@@ -66,27 +73,121 @@ public interface ClusterCommunicationService { ...@@ -66,27 +73,121 @@ public interface ClusterCommunicationService {
66 * @param message message to send 73 * @param message message to send
67 * @param toNodeId recipient node identifier 74 * @param toNodeId recipient node identifier
68 * @return reply future. 75 * @return reply future.
69 - * @throws IOException when I/O exception of some sort has occurred
70 */ 76 */
71 - ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException; 77 + @Deprecated
78 + ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId);
72 79
73 /** 80 /**
74 * Adds a new subscriber for the specified message subject. 81 * Adds a new subscriber for the specified message subject.
75 * 82 *
76 * @param subject message subject 83 * @param subject message subject
77 * @param subscriber message subscriber 84 * @param subscriber message subscriber
85 + * @param executor executor to use for running handler.
78 */ 86 */
79 @Deprecated 87 @Deprecated
80 - void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber); 88 + void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
89 +
90 + /**
91 + * Broadcasts a message to all controller nodes.
92 + *
93 + * @param message message to send
94 + * @param subject message subject
95 + * @param encoder function for encoding message to byte[]
96 + * @param <M> message type
97 + */
98 + <M> void broadcast(M message,
99 + MessageSubject subject,
100 + Function<M, byte[]> encoder);
101 +
102 + /**
103 + * Broadcasts a message to all controller nodes including self.
104 + *
105 + * @param message message to send
106 + * @param subject message subject
107 + * @param encoder function for encoding message to byte[]
108 + * @param <M> message type
109 + */
110 + <M> void broadcastIncludeSelf(M message,
111 + MessageSubject subject,
112 + Function<M, byte[]> encoder);
113 +
114 + /**
115 + * Sends a message to the specified controller node.
116 + *
117 + * @param message message to send
118 + * @param subject message subject
119 + * @param encoder function for encoding message to byte[]
120 + * @param toNodeId destination node identifier
121 + * @param <M> message type
122 + * @return true if the message was sent successfully; false otherwise
123 + */
124 + <M> boolean unicast(M message,
125 + MessageSubject subject,
126 + Function<M, byte[]> encoder,
127 + NodeId toNodeId);
128 +
129 + /**
130 + * Multicasts a message to a set of controller nodes.
131 + *
132 + * @param message message to send
133 + * @param subject message subject
134 + * @param encoder function for encoding message to byte[]
135 + * @param nodeIds recipient node identifiers
136 + * @param <M> message type
137 + */
138 + <M> void multicast(M message,
139 + MessageSubject subject,
140 + Function<M, byte[]> encoder,
141 + Set<NodeId> nodeIds);
142 +
143 + /**
144 + * Sends a message and expects a reply.
145 + *
146 + * @param message message to send
147 + * @param subject message subject
148 + * @param encoder function for encoding request to byte[]
149 + * @param decoder function for decoding response from byte[]
150 + * @param toNodeId recipient node identifier
151 + * @param <M> request type
152 + * @param <R> reply type
153 + * @return reply future
154 + */
155 + <M, R> CompletableFuture<R> sendAndReceive(M message,
156 + MessageSubject subject,
157 + Function<M, byte[]> encoder,
158 + Function<byte[], R> decoder,
159 + NodeId toNodeId);
81 160
82 /** 161 /**
83 * Adds a new subscriber for the specified message subject. 162 * Adds a new subscriber for the specified message subject.
84 * 163 *
85 * @param subject message subject 164 * @param subject message subject
86 - * @param subscriber message subscriber 165 + * @param decoder decoder for resurrecting incoming message
87 - * @param executor executor to use for running handler. 166 + * @param handler handler function that process the incoming message and produces a reply
167 + * @param encoder encoder for serializing reply
168 + * @param executor executor to run this handler on
169 + * @param <M> incoming message type
170 + * @param <R> reply message type
88 */ 171 */
89 - void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor); 172 + <M, R> void addSubscriber(MessageSubject subject,
173 + Function<byte[], M> decoder,
174 + Function<M, R> handler,
175 + Function<R, byte[]> encoder,
176 + ExecutorService executor);
177 +
178 + /**
179 + * Adds a new subscriber for the specified message subject.
180 + *
181 + * @param subject message subject
182 + * @param decoder decoder to resurrecting incoming message
183 + * @param handler handler for handling message
184 + * @param executor executor to run this handler on
185 + * @param <M> incoming message type
186 + */
187 + <M> void addSubscriber(MessageSubject subject,
188 + Function<byte[], M> decoder,
189 + Consumer<M> handler,
190 + ExecutorService executor);
90 191
91 /** 192 /**
92 * Removes a subscriber for the specified message subject. 193 * Removes a subscriber for the specified message subject.
...@@ -94,5 +195,4 @@ public interface ClusterCommunicationService { ...@@ -94,5 +195,4 @@ public interface ClusterCommunicationService {
94 * @param subject message subject 195 * @param subject message subject
95 */ 196 */
96 void removeSubscriber(MessageSubject subject); 197 void removeSubscriber(MessageSubject subject);
97 -
98 } 198 }
......
...@@ -17,7 +17,6 @@ package org.onosproject.store.app; ...@@ -17,7 +17,6 @@ package org.onosproject.store.app;
17 17
18 import com.google.common.base.Charsets; 18 import com.google.common.base.Charsets;
19 import com.google.common.collect.ImmutableSet; 19 import com.google.common.collect.ImmutableSet;
20 -import com.google.common.util.concurrent.ListenableFuture;
21 20
22 import org.apache.felix.scr.annotations.Activate; 21 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component; 22 import org.apache.felix.scr.annotations.Component;
...@@ -54,13 +53,14 @@ import org.onosproject.store.service.StorageService; ...@@ -54,13 +53,14 @@ import org.onosproject.store.service.StorageService;
54 import org.slf4j.Logger; 53 import org.slf4j.Logger;
55 54
56 import java.io.ByteArrayInputStream; 55 import java.io.ByteArrayInputStream;
57 -import java.io.IOException;
58 import java.io.InputStream; 56 import java.io.InputStream;
59 import java.util.Set; 57 import java.util.Set;
60 import java.util.concurrent.CountDownLatch; 58 import java.util.concurrent.CountDownLatch;
61 import java.util.concurrent.ExecutorService; 59 import java.util.concurrent.ExecutorService;
62 import java.util.concurrent.Executors; 60 import java.util.concurrent.Executors;
63 import java.util.concurrent.ScheduledExecutorService; 61 import java.util.concurrent.ScheduledExecutorService;
62 +import java.util.function.Function;
63 +
64 import static com.google.common.io.ByteStreams.toByteArray; 64 import static com.google.common.io.ByteStreams.toByteArray;
65 import static java.util.concurrent.TimeUnit.MILLISECONDS; 65 import static java.util.concurrent.TimeUnit.MILLISECONDS;
66 import static org.onlab.util.Tools.groupedThreads; 66 import static org.onlab.util.Tools.groupedThreads;
...@@ -351,22 +351,34 @@ public class GossipApplicationStore extends ApplicationArchive ...@@ -351,22 +351,34 @@ public class GossipApplicationStore extends ApplicationArchive
351 */ 351 */
352 private void fetchBits(Application app) { 352 private void fetchBits(Application app) {
353 ControllerNode localNode = clusterService.getLocalNode(); 353 ControllerNode localNode = clusterService.getLocalNode();
354 - ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
355 - app.id().name().getBytes(Charsets.UTF_8));
356 - //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
357 CountDownLatch latch = new CountDownLatch(1); 354 CountDownLatch latch = new CountDownLatch(1);
358 355
359 // FIXME: send message with name & version to make sure we don't get served old bits 356 // FIXME: send message with name & version to make sure we don't get served old bits
360 357
361 log.info("Downloading bits for application {}", app.id().name()); 358 log.info("Downloading bits for application {}", app.id().name());
362 for (ControllerNode node : clusterService.getNodes()) { 359 for (ControllerNode node : clusterService.getNodes()) {
363 - try { 360 + if (latch.getCount() == 0) {
364 - ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id()); 361 + break;
365 - future.addListener(new InternalBitListener(app, node, future, latch), executor); 362 + }
366 - } catch (IOException e) { 363 + if (node.equals(localNode)) {
367 - log.debug("Unable to request bits for application {} from node {}", 364 + continue;
365 + }
366 + clusterCommunicator.sendAndReceive(app.id().name(),
367 + APP_BITS_REQUEST,
368 + s -> s.getBytes(Charsets.UTF_8),
369 + Function.identity(),
370 + node.id())
371 + .whenCompleteAsync((bits, error) -> {
372 + if (error == null && latch.getCount() > 0) {
373 + saveApplication(new ByteArrayInputStream(bits));
374 + log.info("Downloaded bits for application {} from node {}",
368 app.id().name(), node.id()); 375 app.id().name(), node.id());
376 + latch.countDown();
377 + } else if (error != null) {
378 + log.warn("Unable to fetch bits for application {} from node {}",
379 + app.id().name(), node.id(), error);
369 } 380 }
381 + }, executor);
370 } 382 }
371 383
372 try { 384 try {
...@@ -392,41 +404,6 @@ public class GossipApplicationStore extends ApplicationArchive ...@@ -392,41 +404,6 @@ public class GossipApplicationStore extends ApplicationArchive
392 } 404 }
393 } 405 }
394 } 406 }
395 -
396 - /**
397 - * Processes completed fetch requests.
398 - */
399 - private class InternalBitListener implements Runnable {
400 - private final Application app;
401 - private final ControllerNode node;
402 - private final ListenableFuture<byte[]> future;
403 - private final CountDownLatch latch;
404 -
405 - public InternalBitListener(Application app, ControllerNode node,
406 - ListenableFuture<byte[]> future, CountDownLatch latch) {
407 - this.app = app;
408 - this.node = node;
409 - this.future = future;
410 - this.latch = latch;
411 - }
412 -
413 - @Override
414 - public void run() {
415 - if (latch.getCount() > 0 && !future.isCancelled()) {
416 - try {
417 - byte[] bits = future.get(1, MILLISECONDS);
418 - saveApplication(new ByteArrayInputStream(bits));
419 - log.info("Downloaded bits for application {} from node {}",
420 - app.id().name(), node.id());
421 - latch.countDown();
422 - } catch (Exception e) {
423 - log.warn("Unable to fetch bits for application {} from node {}",
424 - app.id().name(), node.id());
425 - }
426 - }
427 - }
428 - }
429 -
430 /** 407 /**
431 * Prunes applications which are not in the map, but are on disk. 408 * Prunes applications which are not in the map, but are on disk.
432 */ 409 */
...@@ -449,6 +426,4 @@ public class GossipApplicationStore extends ApplicationArchive ...@@ -449,6 +426,4 @@ public class GossipApplicationStore extends ApplicationArchive
449 appDesc.origin(), appDesc.permissions(), 426 appDesc.origin(), appDesc.permissions(),
450 appDesc.featuresRepo(), appDesc.features()); 427 appDesc.featuresRepo(), appDesc.features());
451 } 428 }
452 -
453 } 429 }
454 -
......
...@@ -419,10 +419,9 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -419,10 +419,9 @@ public class HazelcastLeadershipService implements LeadershipService {
419 // Dispatch to all instances 419 // Dispatch to all instances
420 420
421 clusterCommunicator.broadcastIncludeSelf( 421 clusterCommunicator.broadcastIncludeSelf(
422 - new ClusterMessage( 422 + leadershipEvent,
423 - clusterService.getLocalNode().id(),
424 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 423 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
425 - SERIALIZER.encode(leadershipEvent))); 424 + SERIALIZER::encode);
426 } else { 425 } else {
427 // 426 //
428 // Test if time to expire a stale leader 427 // Test if time to expire a stale leader
...@@ -491,11 +490,11 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -491,11 +490,11 @@ public class HazelcastLeadershipService implements LeadershipService {
491 leadershipEvent = new LeadershipEvent( 490 leadershipEvent = new LeadershipEvent(
492 LeadershipEvent.Type.LEADER_ELECTED, 491 LeadershipEvent.Type.LEADER_ELECTED,
493 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0)); 492 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
493 +
494 clusterCommunicator.broadcastIncludeSelf( 494 clusterCommunicator.broadcastIncludeSelf(
495 - new ClusterMessage( 495 + leadershipEvent,
496 - clusterService.getLocalNode().id(),
497 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 496 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
498 - SERIALIZER.encode(leadershipEvent))); 497 + SERIALIZER::encode);
499 } 498 }
500 499
501 // Sleep forever until interrupted 500 // Sleep forever until interrupted
...@@ -519,11 +518,12 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -519,11 +518,12 @@ public class HazelcastLeadershipService implements LeadershipService {
519 leadershipEvent = new LeadershipEvent( 518 leadershipEvent = new LeadershipEvent(
520 LeadershipEvent.Type.LEADER_BOOTED, 519 LeadershipEvent.Type.LEADER_BOOTED,
521 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0)); 520 new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
521 +
522 clusterCommunicator.broadcastIncludeSelf( 522 clusterCommunicator.broadcastIncludeSelf(
523 - new ClusterMessage( 523 + leadershipEvent,
524 - clusterService.getLocalNode().id(),
525 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 524 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
526 - SERIALIZER.encode(leadershipEvent))); 525 + SERIALIZER::encode);
526 +
527 if (leaderLock.isLockedByCurrentThread()) { 527 if (leaderLock.isLockedByCurrentThread()) {
528 leaderLock.unlock(); 528 leaderLock.unlock();
529 } 529 }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.cluster.messaging.impl; 16 package org.onosproject.store.cluster.messaging.impl;
17 17
18 -import com.google.common.util.concurrent.ListenableFuture;
19 import org.apache.felix.scr.annotations.Activate; 18 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 19 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
...@@ -37,8 +36,17 @@ import org.onosproject.store.cluster.messaging.MessageSubject; ...@@ -37,8 +36,17 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
37 import org.slf4j.Logger; 36 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory; 37 import org.slf4j.LoggerFactory;
39 38
39 +import com.google.common.base.Objects;
40 +import com.google.common.util.concurrent.ListenableFuture;
41 +import com.google.common.util.concurrent.SettableFuture;
42 +
40 import java.io.IOException; 43 import java.io.IOException;
44 +import java.util.Set;
45 +import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.ExecutorService; 46 import java.util.concurrent.ExecutorService;
47 +import java.util.function.Consumer;
48 +import java.util.function.Function;
49 +import java.util.stream.Collectors;
42 50
43 import static com.google.common.base.Preconditions.checkArgument; 51 import static com.google.common.base.Preconditions.checkArgument;
44 52
...@@ -122,46 +130,101 @@ public class ClusterCommunicationManager ...@@ -122,46 +130,101 @@ public class ClusterCommunicationManager
122 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId); 130 return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
123 } 131 }
124 132
125 - private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException { 133 + @Override
126 - ControllerNode node = clusterService.getNode(toNodeId); 134 + public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
127 - checkArgument(node != null, "Unknown nodeId: %s", toNodeId); 135 + SettableFuture<byte[]> response = SettableFuture.create();
128 - Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort()); 136 + sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
129 - try { 137 + if (e == null) {
130 - messagingService.sendAsync(nodeEp, subject.value(), payload); 138 + response.set(r);
131 - return true; 139 + } else {
132 - } catch (IOException e) { 140 + response.setException(e);
133 - log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
134 - throw e;
135 } 141 }
142 + });
143 + return response;
136 } 144 }
137 145
138 - private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) { 146 + @Override
139 - try { 147 + public <M> void broadcast(M message,
140 - return unicast(subject, payload, toNodeId); 148 + MessageSubject subject,
141 - } catch (IOException e) { 149 + Function<M, byte[]> encoder) {
142 - return false; 150 + multicast(message,
151 + subject,
152 + encoder,
153 + clusterService.getNodes()
154 + .stream()
155 + .filter(node -> !Objects.equal(node, clusterService.getLocalNode()))
156 + .map(ControllerNode::id)
157 + .collect(Collectors.toSet()));
158 + }
159 +
160 + @Override
161 + public <M> void broadcastIncludeSelf(M message,
162 + MessageSubject subject,
163 + Function<M, byte[]> encoder) {
164 + multicast(message,
165 + subject,
166 + encoder,
167 + clusterService.getNodes()
168 + .stream()
169 + .map(ControllerNode::id)
170 + .collect(Collectors.toSet()));
143 } 171 }
172 +
173 + @Override
174 + public <M> boolean unicast(M message,
175 + MessageSubject subject,
176 + Function<M, byte[]> encoder,
177 + NodeId toNodeId) {
178 + byte[] payload = new ClusterMessage(
179 + clusterService.getLocalNode().id(),
180 + subject,
181 + encoder.apply(message)).getBytes();
182 + return unicastUnchecked(subject, payload, toNodeId);
144 } 183 }
145 184
146 @Override 185 @Override
147 - public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException { 186 + public <M> void multicast(M message,
187 + MessageSubject subject,
188 + Function<M, byte[]> encoder,
189 + Set<NodeId> nodes) {
190 + byte[] payload = new ClusterMessage(
191 + clusterService.getLocalNode().id(),
192 + subject,
193 + encoder.apply(message)).getBytes();
194 + nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
195 + }
196 +
197 + @Override
198 + public <M, R> CompletableFuture<R> sendAndReceive(M message,
199 + MessageSubject subject,
200 + Function<M, byte[]> encoder,
201 + Function<byte[], R> decoder,
202 + NodeId toNodeId) {
203 + ClusterMessage envelope = new ClusterMessage(
204 + clusterService.getLocalNode().id(),
205 + subject,
206 + encoder.apply(message));
207 + return sendAndReceive(subject, envelope.getBytes(), toNodeId).thenApply(decoder);
208 + }
209 +
210 + private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
148 ControllerNode node = clusterService.getNode(toNodeId); 211 ControllerNode node = clusterService.getNode(toNodeId);
149 checkArgument(node != null, "Unknown nodeId: %s", toNodeId); 212 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
150 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort()); 213 Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
151 try { 214 try {
152 - return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes()); 215 + messagingService.sendAsync(nodeEp, subject.value(), payload);
153 - 216 + return true;
154 } catch (IOException e) { 217 } catch (IOException e) {
155 - log.trace("Failed interaction with remote nodeId: " + toNodeId, e); 218 + log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
156 - throw e; 219 + return false;
157 } 220 }
158 } 221 }
159 222
160 - @Override 223 + private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
161 - @Deprecated 224 + ControllerNode node = clusterService.getNode(toNodeId);
162 - public void addSubscriber(MessageSubject subject, 225 + checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
163 - ClusterMessageHandler subscriber) { 226 + Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
164 - messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber)); 227 + return messagingService.sendAndReceive(nodeEp, subject.value(), payload);
165 } 228 }
166 229
167 @Override 230 @Override
...@@ -202,6 +265,60 @@ public class ClusterCommunicationManager ...@@ -202,6 +265,60 @@ public class ClusterCommunicationManager
202 } 265 }
203 } 266 }
204 267
268 + @Override
269 + public <M, R> void addSubscriber(MessageSubject subject,
270 + Function<byte[], M> decoder,
271 + Function<M, R> handler,
272 + Function<R, byte[]> encoder,
273 + ExecutorService executor) {
274 + messagingService.registerHandler(subject.value(),
275 + new InternalMessageResponder<>(decoder, encoder, handler),
276 + executor);
277 + }
278 +
279 + @Override
280 + public <M> void addSubscriber(MessageSubject subject,
281 + Function<byte[], M> decoder,
282 + Consumer<M> handler,
283 + ExecutorService executor) {
284 + messagingService.registerHandler(subject.value(),
285 + new InternalMessageConsumer<>(decoder, handler),
286 + executor);
287 + }
288 +
289 + private class InternalMessageResponder<M, R> implements MessageHandler {
290 + private final Function<byte[], M> decoder;
291 + private final Function<R, byte[]> encoder;
292 + private final Function<M, R> handler;
293 +
294 + public InternalMessageResponder(Function<byte[], M> decoder,
295 + Function<R, byte[]> encoder,
296 + Function<M, R> handler) {
297 + this.decoder = decoder;
298 + this.encoder = encoder;
299 + this.handler = handler;
300 + }
301 + @Override
302 + public void handle(Message message) throws IOException {
303 + R response = handler.apply(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
304 + message.respond(encoder.apply(response));
305 + }
306 + }
307 +
308 + private class InternalMessageConsumer<M> implements MessageHandler {
309 + private final Function<byte[], M> decoder;
310 + private final Consumer<M> consumer;
311 +
312 + public InternalMessageConsumer(Function<byte[], M> decoder, Consumer<M> consumer) {
313 + this.decoder = decoder;
314 + this.consumer = consumer;
315 + }
316 + @Override
317 + public void handle(Message message) throws IOException {
318 + consumer.accept(decoder.apply(ClusterMessage.fromBytes(message.payload()).payload()));
319 + }
320 + }
321 +
205 public static final class InternalClusterMessage extends ClusterMessage { 322 public static final class InternalClusterMessage extends ClusterMessage {
206 323
207 private final Message rawMessage; 324 private final Message rawMessage;
......
...@@ -343,11 +343,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -343,11 +343,9 @@ public class DistributedLeadershipManager implements LeadershipService {
343 343
344 private void notifyPeers(LeadershipEvent event) { 344 private void notifyPeers(LeadershipEvent event) {
345 eventDispatcher.post(event); 345 eventDispatcher.post(event);
346 - clusterCommunicator.broadcast( 346 + clusterCommunicator.broadcast(event,
347 - new ClusterMessage(
348 - clusterService.getLocalNode().id(),
349 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 347 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
350 - SERIALIZER.encode(event))); 348 + SERIALIZER::encode);
351 } 349 }
352 350
353 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) { 351 private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
...@@ -366,11 +364,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -366,11 +364,9 @@ public class DistributedLeadershipManager implements LeadershipService {
366 if (updatedLeader) { 364 if (updatedLeader) {
367 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership); 365 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
368 eventDispatcher.post(event); 366 eventDispatcher.post(event);
369 - clusterCommunicator.broadcast( 367 + clusterCommunicator.broadcast(event,
370 - new ClusterMessage(
371 - clusterService.getLocalNode().id(),
372 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 368 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
373 - SERIALIZER.encode(event))); 369 + SERIALIZER::encode);
374 } 370 }
375 } 371 }
376 372
...@@ -469,11 +465,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -469,11 +465,9 @@ public class DistributedLeadershipManager implements LeadershipService {
469 leaderBoard.forEach((path, leadership) -> { 465 leaderBoard.forEach((path, leadership) -> {
470 if (leadership.leader().equals(localNodeId)) { 466 if (leadership.leader().equals(localNodeId)) {
471 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership); 467 LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
472 - clusterCommunicator.broadcast( 468 + clusterCommunicator.broadcast(event,
473 - new ClusterMessage(
474 - clusterService.getLocalNode().id(),
475 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 469 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
476 - SERIALIZER.encode(event))); 470 + SERIALIZER::encode);
477 } 471 }
478 }); 472 });
479 } catch (Exception e) { 473 } catch (Exception e) {
......
...@@ -304,11 +304,9 @@ public class GossipDeviceStore ...@@ -304,11 +304,9 @@ public class GossipDeviceStore
304 304
305 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent( 305 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
306 providerId, deviceId, deviceDescription); 306 providerId, deviceId, deviceDescription);
307 - ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
308 - SERIALIZER.encode(deviceInjectedEvent));
309 307
310 // TODO check unicast return value 308 // TODO check unicast return value
311 - clusterCommunicator.unicast(clusterMessage, deviceNode); 309 + clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
312 /* error log: 310 /* error log:
313 log.warn("Failed to process injected device id: {} desc: {} " + 311 log.warn("Failed to process injected device id: {} desc: {} " +
314 "(cluster messaging failed: {})", 312 "(cluster messaging failed: {})",
...@@ -555,11 +553,9 @@ public class GossipDeviceStore ...@@ -555,11 +553,9 @@ public class GossipDeviceStore
555 } 553 }
556 554
557 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions); 555 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
558 - ClusterMessage clusterMessage = new ClusterMessage(
559 - localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
560 556
561 //TODO check unicast return value 557 //TODO check unicast return value
562 - clusterCommunicator.unicast(clusterMessage, deviceNode); 558 + clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
563 /* error log: 559 /* error log:
564 log.warn("Failed to process injected ports of device id: {} " + 560 log.warn("Failed to process injected ports of device id: {} " +
565 "(cluster messaging failed: {})", 561 "(cluster messaging failed: {})",
...@@ -867,13 +863,8 @@ public class GossipDeviceStore ...@@ -867,13 +863,8 @@ public class GossipDeviceStore
867 log.debug("{} has control of {}, forwarding remove request", 863 log.debug("{} has control of {}, forwarding remove request",
868 master, deviceId); 864 master, deviceId);
869 865
870 - ClusterMessage message = new ClusterMessage(
871 - myId,
872 - DEVICE_REMOVE_REQ,
873 - SERIALIZER.encode(deviceId));
874 -
875 // TODO check unicast return value 866 // TODO check unicast return value
876 - clusterCommunicator.unicast(message, master); 867 + clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
877 /* error log: 868 /* error log:
878 log.error("Failed to forward {} remove request to {}", deviceId, master, e); 869 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
879 */ 870 */
...@@ -1057,19 +1048,11 @@ public class GossipDeviceStore ...@@ -1057,19 +1048,11 @@ public class GossipDeviceStore
1057 } 1048 }
1058 1049
1059 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException { 1050 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1060 - ClusterMessage message = new ClusterMessage( 1051 + clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1061 - clusterService.getLocalNode().id(),
1062 - subject,
1063 - SERIALIZER.encode(event));
1064 - clusterCommunicator.unicast(message, recipient);
1065 } 1052 }
1066 1053
1067 private void broadcastMessage(MessageSubject subject, Object event) { 1054 private void broadcastMessage(MessageSubject subject, Object event) {
1068 - ClusterMessage message = new ClusterMessage( 1055 + clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1069 - clusterService.getLocalNode().id(),
1070 - subject,
1071 - SERIALIZER.encode(event));
1072 - clusterCommunicator.broadcast(message);
1073 } 1056 }
1074 1057
1075 private void notifyPeers(InternalDeviceEvent event) { 1058 private void notifyPeers(InternalDeviceEvent event) {
......
...@@ -510,11 +510,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -510,11 +510,7 @@ public class EventuallyConsistentMapImpl<K, V>
510 } 510 }
511 511
512 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) { 512 private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
513 - ClusterMessage message = new ClusterMessage( 513 + return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
514 - clusterService.getLocalNode().id(),
515 - subject,
516 - serializer.encode(event));
517 - return clusterCommunicator.unicast(message, peer);
518 // Note: we had this flipped before... 514 // Note: we had this flipped before...
519 // communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer)); 515 // communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
520 } 516 }
......
...@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; ...@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Iterables; 22 import com.google.common.collect.Iterables;
23 import com.google.common.collect.Maps; 23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets; 24 import com.google.common.collect.Sets;
25 +import com.google.common.util.concurrent.Futures;
25 import com.hazelcast.core.IMap; 26 import com.hazelcast.core.IMap;
26 27
27 import org.apache.felix.scr.annotations.Activate; 28 import org.apache.felix.scr.annotations.Activate;
...@@ -35,6 +36,7 @@ import org.apache.felix.scr.annotations.Service; ...@@ -35,6 +36,7 @@ import org.apache.felix.scr.annotations.Service;
35 import org.onlab.util.BoundedThreadPool; 36 import org.onlab.util.BoundedThreadPool;
36 import org.onlab.util.KryoNamespace; 37 import org.onlab.util.KryoNamespace;
37 import org.onlab.util.NewConcurrentHashMap; 38 import org.onlab.util.NewConcurrentHashMap;
39 +import org.onlab.util.Tools;
38 import org.onosproject.cfg.ComponentConfigService; 40 import org.onosproject.cfg.ComponentConfigService;
39 import org.onosproject.cluster.ClusterService; 41 import org.onosproject.cluster.ClusterService;
40 import org.onosproject.cluster.NodeId; 42 import org.onosproject.cluster.NodeId;
...@@ -93,7 +95,6 @@ import java.util.concurrent.ExecutorService; ...@@ -93,7 +95,6 @@ import java.util.concurrent.ExecutorService;
93 import java.util.concurrent.Executors; 95 import java.util.concurrent.Executors;
94 import java.util.concurrent.Future; 96 import java.util.concurrent.Future;
95 import java.util.concurrent.TimeUnit; 97 import java.util.concurrent.TimeUnit;
96 -import java.util.concurrent.TimeoutException;
97 import java.util.stream.Collectors; 98 import java.util.stream.Collectors;
98 99
99 import static com.google.common.base.Preconditions.checkNotNull; 100 import static com.google.common.base.Preconditions.checkNotNull;
...@@ -360,21 +361,15 @@ public class DistributedFlowRuleStore ...@@ -360,21 +361,15 @@ public class DistributedFlowRuleStore
360 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}", 361 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
361 replicaInfo.master().orNull(), rule.deviceId()); 362 replicaInfo.master().orNull(), rule.deviceId());
362 363
363 - ClusterMessage message = new ClusterMessage( 364 + return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
364 - clusterService.getLocalNode().id(),
365 FlowStoreMessageSubjects.GET_FLOW_ENTRY, 365 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
366 - SERIALIZER.encode(rule)); 366 + SERIALIZER::encode,
367 - 367 + SERIALIZER::decode,
368 - try { 368 + replicaInfo.master().get()),
369 - Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 369 + FLOW_RULE_STORE_TIMEOUT_MILLIS,
370 - return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); 370 + TimeUnit.MILLISECONDS,
371 - } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { 371 + null);
372 - log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
373 } 372 }
374 - return null;
375 - }
376 -
377 -
378 373
379 @Override 374 @Override
380 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { 375 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
...@@ -393,21 +388,15 @@ public class DistributedFlowRuleStore ...@@ -393,21 +388,15 @@ public class DistributedFlowRuleStore
393 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}", 388 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
394 replicaInfo.master().orNull(), deviceId); 389 replicaInfo.master().orNull(), deviceId);
395 390
396 - ClusterMessage message = new ClusterMessage( 391 + return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
397 - clusterService.getLocalNode().id(), 392 + FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
398 - GET_DEVICE_FLOW_ENTRIES, 393 + SERIALIZER::encode,
399 - SERIALIZER.encode(deviceId)); 394 + SERIALIZER::decode,
400 - 395 + replicaInfo.master().get()),
401 - try { 396 + FLOW_RULE_STORE_TIMEOUT_MILLIS,
402 - Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 397 + TimeUnit.MILLISECONDS,
403 - return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); 398 + Collections.emptyList());
404 - } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
405 - log.warn("Unable to fetch flow store contents from {}", replicaInfo.master().get());
406 } 399 }
407 - return Collections.emptyList();
408 - }
409 -
410 -
411 400
412 @Override 401 @Override
413 public void storeFlowRule(FlowRule rule) { 402 public void storeFlowRule(FlowRule rule) {
...@@ -453,14 +442,10 @@ public class DistributedFlowRuleStore ...@@ -453,14 +442,10 @@ public class DistributedFlowRuleStore
453 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", 442 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
454 replicaInfo.master().orNull(), deviceId); 443 replicaInfo.master().orNull(), deviceId);
455 444
456 - ClusterMessage message = new ClusterMessage( 445 + if (!clusterCommunicator.unicast(operation,
457 - local, 446 + APPLY_BATCH_FLOWS, SERIALIZER::encode,
458 - APPLY_BATCH_FLOWS, 447 + replicaInfo.master().get())) {
459 - SERIALIZER.encode(operation)); 448 + log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
460 -
461 -
462 - if (!clusterCommunicator.unicast(message, replicaInfo.master().get())) {
463 - log.warn("Failed to storeBatch: {} to {}", message, replicaInfo.master());
464 449
465 Set<FlowRule> allFailures = operation.getOperations().stream() 450 Set<FlowRule> allFailures = operation.getOperations().stream()
466 .map(op -> op.target()) 451 .map(op -> op.target())
...@@ -612,18 +597,15 @@ public class DistributedFlowRuleStore ...@@ -612,18 +597,15 @@ public class DistributedFlowRuleStore
612 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}", 597 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
613 replicaInfo.master().orNull(), deviceId); 598 replicaInfo.master().orNull(), deviceId);
614 599
615 - ClusterMessage message = new ClusterMessage( 600 + return Futures.get(clusterCommunicator.sendAndReceive(
616 - clusterService.getLocalNode().id(), 601 + rule,
617 REMOVE_FLOW_ENTRY, 602 REMOVE_FLOW_ENTRY,
618 - SERIALIZER.encode(rule)); 603 + SERIALIZER::encode,
619 - 604 + SERIALIZER::decode,
620 - try { 605 + replicaInfo.master().get()),
621 - Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 606 + FLOW_RULE_STORE_TIMEOUT_MILLIS,
622 - return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); 607 + TimeUnit.MILLISECONDS,
623 - } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) { 608 + RuntimeException.class);
624 - // TODO: Retry against latest master or throw a FlowStoreException
625 - throw new RuntimeException(e);
626 - }
627 } 609 }
628 610
629 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) { 611 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
...@@ -649,12 +631,8 @@ public class DistributedFlowRuleStore ...@@ -649,12 +631,8 @@ public class DistributedFlowRuleStore
649 if (nodeId == null) { 631 if (nodeId == null) {
650 notifyDelegate(event); 632 notifyDelegate(event);
651 } else { 633 } else {
652 - ClusterMessage message = new ClusterMessage(
653 - clusterService.getLocalNode().id(),
654 - REMOTE_APPLY_COMPLETED,
655 - SERIALIZER.encode(event));
656 // TODO check unicast return value 634 // TODO check unicast return value
657 - clusterCommunicator.unicast(message, nodeId); 635 + clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
658 //error log: log.warn("Failed to respond to peer for batch operation result"); 636 //error log: log.warn("Failed to respond to peer for batch operation result");
659 } 637 }
660 } 638 }
......
...@@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; ...@@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder;
20 import com.google.common.util.concurrent.Futures; 20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.SettableFuture; 22 import com.google.common.util.concurrent.SettableFuture;
23 +
23 import org.apache.felix.scr.annotations.Activate; 24 import org.apache.felix.scr.annotations.Activate;
24 import org.apache.felix.scr.annotations.Component; 25 import org.apache.felix.scr.annotations.Component;
25 import org.apache.felix.scr.annotations.Deactivate; 26 import org.apache.felix.scr.annotations.Deactivate;
...@@ -45,7 +46,6 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler; ...@@ -45,7 +46,6 @@ import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
45 import org.onosproject.store.flow.ReplicaInfo; 46 import org.onosproject.store.flow.ReplicaInfo;
46 import org.onosproject.store.flow.ReplicaInfoEventListener; 47 import org.onosproject.store.flow.ReplicaInfoEventListener;
47 import org.onosproject.store.flow.ReplicaInfoService; 48 import org.onosproject.store.flow.ReplicaInfoService;
48 -import org.onosproject.store.serializers.DecodeTo;
49 import org.onosproject.store.serializers.KryoSerializer; 49 import org.onosproject.store.serializers.KryoSerializer;
50 import org.onosproject.store.serializers.StoreSerializer; 50 import org.onosproject.store.serializers.StoreSerializer;
51 import org.onosproject.store.serializers.impl.DistributedStoreSerializers; 51 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
...@@ -199,18 +199,12 @@ public class DefaultFlowRuleExtRouter ...@@ -199,18 +199,12 @@ public class DefaultFlowRuleExtRouter
199 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", 199 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
200 replicaInfo.master().orNull(), deviceId); 200 replicaInfo.master().orNull(), deviceId);
201 201
202 - ClusterMessage message = new ClusterMessage(clusterService 202 + return clusterCommunicator.sendAndReceive(
203 - .getLocalNode().id(), APPLY_EXTEND_FLOWS, SERIALIZER.encode(batchOperation)); 203 + batchOperation,
204 - 204 + APPLY_EXTEND_FLOWS,
205 - try { 205 + SERIALIZER::encode,
206 - ListenableFuture<byte[]> responseFuture = clusterCommunicator 206 + SERIALIZER::decode,
207 - .sendAndReceive(message, replicaInfo.master().get()); 207 + replicaInfo.master().get());
208 - // here should add another decode process
209 - return Futures.transform(responseFuture,
210 - new DecodeTo<FlowExtCompletedOperation>(SERIALIZER));
211 - } catch (IOException e) {
212 - return Futures.immediateFailedFuture(e);
213 - }
214 } 208 }
215 209
216 /** 210 /**
......
...@@ -382,17 +382,13 @@ public class DistributedGroupStore ...@@ -382,17 +382,13 @@ public class DistributedGroupStore
382 GroupStoreMessage groupOp = GroupStoreMessage. 382 GroupStoreMessage groupOp = GroupStoreMessage.
383 createGroupAddRequestMsg(groupDesc.deviceId(), 383 createGroupAddRequestMsg(groupDesc.deviceId(),
384 groupDesc); 384 groupDesc);
385 - ClusterMessage message = new ClusterMessage( 385 +
386 - clusterService.getLocalNode().id(), 386 + if (!clusterCommunicator.unicast(groupOp,
387 - GroupStoreMessageSubjects. 387 + GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
388 - REMOTE_GROUP_OP_REQUEST, 388 + m -> kryoBuilder.build().serialize(m),
389 - kryoBuilder.build().serialize(groupOp)); 389 + mastershipService.getMasterFor(groupDesc.deviceId()))) {
390 - if (!clusterCommunicator.unicast(message,
391 - mastershipService.
392 - getMasterFor(
393 - groupDesc.deviceId()))) {
394 log.warn("Failed to send request to master: {} to {}", 390 log.warn("Failed to send request to master: {} to {}",
395 - message, 391 + groupOp,
396 mastershipService.getMasterFor(groupDesc.deviceId())); 392 mastershipService.getMasterFor(groupDesc.deviceId()));
397 //TODO: Send Group operation failure event 393 //TODO: Send Group operation failure event
398 } 394 }
...@@ -472,16 +468,13 @@ public class DistributedGroupStore ...@@ -472,16 +468,13 @@ public class DistributedGroupStore
472 type, 468 type,
473 newBuckets, 469 newBuckets,
474 newAppCookie); 470 newAppCookie);
475 - ClusterMessage message = 471 +
476 - new ClusterMessage(clusterService.getLocalNode().id(), 472 + if (!clusterCommunicator.unicast(groupOp,
477 - GroupStoreMessageSubjects. 473 + GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
478 - REMOTE_GROUP_OP_REQUEST, 474 + m -> kryoBuilder.build().serialize(m),
479 - kryoBuilder.build().serialize(groupOp)); 475 + mastershipService.getMasterFor(deviceId))) {
480 - if (!clusterCommunicator.unicast(message,
481 - mastershipService.
482 - getMasterFor(deviceId))) {
483 log.warn("Failed to send request to master: {} to {}", 476 log.warn("Failed to send request to master: {} to {}",
484 - message, 477 + groupOp,
485 mastershipService.getMasterFor(deviceId)); 478 mastershipService.getMasterFor(deviceId));
486 //TODO: Send Group operation failure event 479 //TODO: Send Group operation failure event
487 } 480 }
...@@ -584,16 +577,13 @@ public class DistributedGroupStore ...@@ -584,16 +577,13 @@ public class DistributedGroupStore
584 GroupStoreMessage groupOp = GroupStoreMessage. 577 GroupStoreMessage groupOp = GroupStoreMessage.
585 createGroupDeleteRequestMsg(deviceId, 578 createGroupDeleteRequestMsg(deviceId,
586 appCookie); 579 appCookie);
587 - ClusterMessage message = 580 +
588 - new ClusterMessage(clusterService.getLocalNode().id(), 581 + if (!clusterCommunicator.unicast(groupOp,
589 - GroupStoreMessageSubjects. 582 + GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
590 - REMOTE_GROUP_OP_REQUEST, 583 + m -> kryoBuilder.build().serialize(m),
591 - kryoBuilder.build().serialize(groupOp)); 584 + mastershipService.getMasterFor(deviceId))) {
592 - if (!clusterCommunicator.unicast(message,
593 - mastershipService.
594 - getMasterFor(deviceId))) {
595 log.warn("Failed to send request to master: {} to {}", 585 log.warn("Failed to send request to master: {} to {}",
596 - message, 586 + groupOp,
597 mastershipService.getMasterFor(deviceId)); 587 mastershipService.getMasterFor(deviceId));
598 //TODO: Send Group operation failure event 588 //TODO: Send Group operation failure event
599 } 589 }
......
...@@ -477,21 +477,13 @@ public class GossipHostStore ...@@ -477,21 +477,13 @@ public class GossipHostStore
477 } 477 }
478 478
479 private void broadcastMessage(MessageSubject subject, Object event) { 479 private void broadcastMessage(MessageSubject subject, Object event) {
480 - ClusterMessage message = new ClusterMessage( 480 + clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
481 - clusterService.getLocalNode().id(),
482 - subject,
483 - SERIALIZER.encode(event));
484 - clusterCommunicator.broadcast(message);
485 } 481 }
486 482
487 private void unicastMessage(NodeId peer, 483 private void unicastMessage(NodeId peer,
488 MessageSubject subject, 484 MessageSubject subject,
489 Object event) throws IOException { 485 Object event) throws IOException {
490 - ClusterMessage message = new ClusterMessage( 486 + clusterCommunicator.unicast(event, subject, SERIALIZER::encode, peer);
491 - clusterService.getLocalNode().id(),
492 - subject,
493 - SERIALIZER.encode(event));
494 - clusterCommunicator.unicast(message, peer);
495 } 487 }
496 488
497 private void notifyDelegateIfNotNull(HostEvent event) { 489 private void notifyDelegateIfNotNull(HostEvent event) {
......
...@@ -334,17 +334,12 @@ public class GossipLinkStore ...@@ -334,17 +334,12 @@ public class GossipLinkStore
334 334
335 335
336 LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription); 336 LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
337 - ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
338 - GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
339 337
340 // TODO check unicast return value 338 // TODO check unicast return value
341 - clusterCommunicator.unicast(linkInjectedMessage, dstNode); 339 + clusterCommunicator.unicast(linkInjectedEvent,
342 - /* error log: 340 + GossipLinkStoreMessageSubjects.LINK_INJECTED,
343 - log.warn("Failed to process link update between src: {} and dst: {} " + 341 + SERIALIZER::encode,
344 - "(cluster messaging failed: {})", 342 + dstNode);
345 - linkDescription.src(), linkDescription.dst(), e);
346 - */
347 -
348 } 343 }
349 344
350 return linkEvent; 345 return linkEvent;
...@@ -653,19 +648,11 @@ public class GossipLinkStore ...@@ -653,19 +648,11 @@ public class GossipLinkStore
653 } 648 }
654 649
655 private void broadcastMessage(MessageSubject subject, Object event) { 650 private void broadcastMessage(MessageSubject subject, Object event) {
656 - ClusterMessage message = new ClusterMessage( 651 + clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
657 - clusterService.getLocalNode().id(),
658 - subject,
659 - SERIALIZER.encode(event));
660 - clusterCommunicator.broadcast(message);
661 } 652 }
662 653
663 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException { 654 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
664 - ClusterMessage message = new ClusterMessage( 655 + clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
665 - clusterService.getLocalNode().id(),
666 - subject,
667 - SERIALIZER.encode(event));
668 - clusterCommunicator.unicast(message, recipient);
669 } 656 }
670 657
671 private void notifyPeers(InternalLinkEvent event) { 658 private void notifyPeers(InternalLinkEvent event) {
......
...@@ -181,20 +181,14 @@ public class ConsistentDeviceMastershipStore ...@@ -181,20 +181,14 @@ public class ConsistentDeviceMastershipStore
181 } else { 181 } else {
182 return MastershipRole.NONE; 182 return MastershipRole.NONE;
183 } 183 }
184 - } else { 184 + }
185 - try {
186 MastershipRole role = complete(clusterCommunicator.sendAndReceive( 185 MastershipRole role = complete(clusterCommunicator.sendAndReceive(
187 - new ClusterMessage( 186 + deviceId,
188 - localNodeId,
189 ROLE_QUERY_SUBJECT, 187 ROLE_QUERY_SUBJECT,
190 - SERIALIZER.encode(deviceId)), 188 + SERIALIZER::encode,
189 + SERIALIZER::decode,
191 nodeId)); 190 nodeId));
192 return role == null ? MastershipRole.NONE : role; 191 return role == null ? MastershipRole.NONE : role;
193 - } catch (IOException e) {
194 - log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
195 - return MastershipRole.NONE;
196 - }
197 - }
198 } 192 }
199 193
200 @Override 194 @Override
...@@ -276,17 +270,12 @@ public class ConsistentDeviceMastershipStore ...@@ -276,17 +270,12 @@ public class ConsistentDeviceMastershipStore
276 if (!nodeId.equals(localNodeId)) { 270 if (!nodeId.equals(localNodeId)) {
277 log.debug("Forwarding request to relinquish " 271 log.debug("Forwarding request to relinquish "
278 + "role for device {} to {}", deviceId, nodeId); 272 + "role for device {} to {}", deviceId, nodeId);
279 - try {
280 return complete(clusterCommunicator.sendAndReceive( 273 return complete(clusterCommunicator.sendAndReceive(
281 - new ClusterMessage( 274 + deviceId,
282 - localNodeId,
283 ROLE_RELINQUISH_SUBJECT, 275 ROLE_RELINQUISH_SUBJECT,
284 - SERIALIZER.encode(deviceId)), 276 + SERIALIZER::encode,
277 + SERIALIZER::decode,
285 nodeId)); 278 nodeId));
286 - } catch (IOException e) {
287 - log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
288 - return null;
289 - }
290 } 279 }
291 280
292 // Check if this node is can be managed by this node. 281 // Check if this node is can be managed by this node.
......
...@@ -131,9 +131,7 @@ public class DistributedPacketStore ...@@ -131,9 +131,7 @@ public class DistributedPacketStore
131 } 131 }
132 132
133 // TODO check unicast return value 133 // TODO check unicast return value
134 - communicationService.unicast(new ClusterMessage(myId, PACKET_OUT_SUBJECT, 134 + communicationService.unicast(packet, PACKET_OUT_SUBJECT, SERIALIZER::encode, master);
135 - SERIALIZER.encode(packet)),
136 - master);
137 // error log: log.warn("Failed to send packet-out to {}", master); 135 // error log: log.warn("Failed to send packet-out to {}", master);
138 } 136 }
139 137
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.statistic.impl; 16 package org.onosproject.store.statistic.impl;
17 17
18 import com.google.common.collect.Sets; 18 import com.google.common.collect.Sets;
19 +
19 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -23,6 +24,7 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -23,6 +24,7 @@ import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality; 24 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.apache.felix.scr.annotations.Service; 25 import org.apache.felix.scr.annotations.Service;
25 import org.onlab.util.KryoNamespace; 26 import org.onlab.util.KryoNamespace;
27 +import org.onlab.util.Tools;
26 import org.onosproject.cluster.ClusterService; 28 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.net.ConnectPoint; 29 import org.onosproject.net.ConnectPoint;
28 import org.onosproject.net.DeviceId; 30 import org.onosproject.net.DeviceId;
...@@ -47,12 +49,9 @@ import java.util.HashSet; ...@@ -47,12 +49,9 @@ import java.util.HashSet;
47 import java.util.Map; 49 import java.util.Map;
48 import java.util.Set; 50 import java.util.Set;
49 import java.util.concurrent.ConcurrentHashMap; 51 import java.util.concurrent.ConcurrentHashMap;
50 -import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.ExecutorService; 52 import java.util.concurrent.ExecutorService;
52 import java.util.concurrent.Executors; 53 import java.util.concurrent.Executors;
53 -import java.util.concurrent.Future;
54 import java.util.concurrent.TimeUnit; 54 import java.util.concurrent.TimeUnit;
55 -import java.util.concurrent.TimeoutException;
56 import java.util.concurrent.atomic.AtomicInteger; 55 import java.util.concurrent.atomic.AtomicInteger;
57 56
58 import static org.onlab.util.Tools.groupedThreads; 57 import static org.onlab.util.Tools.groupedThreads;
...@@ -218,20 +217,15 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -218,20 +217,15 @@ public class DistributedStatisticStore implements StatisticStore {
218 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 217 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
219 return getCurrentStatisticInternal(connectPoint); 218 return getCurrentStatisticInternal(connectPoint);
220 } else { 219 } else {
221 - ClusterMessage message = new ClusterMessage( 220 + return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
222 - clusterService.getLocalNode().id(), 221 + connectPoint,
223 GET_CURRENT, 222 GET_CURRENT,
224 - SERIALIZER.encode(connectPoint)); 223 + SERIALIZER::encode,
225 - 224 + SERIALIZER::decode,
226 - try { 225 + replicaInfo.master().get()),
227 - Future<byte[]> response = 226 + STATISTIC_STORE_TIMEOUT_MILLIS,
228 - clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 227 + TimeUnit.MILLISECONDS,
229 - return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS, 228 + Collections.emptySet());
230 - TimeUnit.MILLISECONDS));
231 - } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
232 - log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
233 - return Collections.emptySet();
234 - }
235 } 229 }
236 230
237 } 231 }
...@@ -251,24 +245,18 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -251,24 +245,18 @@ public class DistributedStatisticStore implements StatisticStore {
251 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 245 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
252 return getPreviousStatisticInternal(connectPoint); 246 return getPreviousStatisticInternal(connectPoint);
253 } else { 247 } else {
254 - ClusterMessage message = new ClusterMessage( 248 + return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
255 - clusterService.getLocalNode().id(), 249 + connectPoint,
256 GET_PREVIOUS, 250 GET_PREVIOUS,
257 - SERIALIZER.encode(connectPoint)); 251 + SERIALIZER::encode,
258 - 252 + SERIALIZER::decode,
259 - try { 253 + replicaInfo.master().get()),
260 - Future<byte[]> response = 254 + STATISTIC_STORE_TIMEOUT_MILLIS,
261 - clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 255 + TimeUnit.MILLISECONDS,
262 - return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS, 256 + Collections.emptySet());
263 - TimeUnit.MILLISECONDS));
264 - } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
265 - log.warn("Unable to communicate with peer {}", replicaInfo.master().get());
266 - return Collections.emptySet();
267 } 257 }
268 } 258 }
269 259
270 - }
271 -
272 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) { 260 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
273 return previous.get(connectPoint); 261 return previous.get(connectPoint);
274 } 262 }
......
...@@ -65,6 +65,7 @@ import java.util.Set; ...@@ -65,6 +65,7 @@ import java.util.Set;
65 import java.util.concurrent.CountDownLatch; 65 import java.util.concurrent.CountDownLatch;
66 import java.util.concurrent.ExecutorService; 66 import java.util.concurrent.ExecutorService;
67 import java.util.concurrent.TimeUnit; 67 import java.util.concurrent.TimeUnit;
68 +import java.util.function.Function;
68 69
69 import static java.util.Arrays.asList; 70 import static java.util.Arrays.asList;
70 import static org.easymock.EasyMock.*; 71 import static org.easymock.EasyMock.*;
...@@ -181,8 +182,9 @@ public class GossipDeviceStoreTest { ...@@ -181,8 +182,9 @@ public class GossipDeviceStoreTest {
181 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR, 182 new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
182 HW, swVersion, SN, CID, annotations); 183 HW, swVersion, SN, CID, annotations);
183 reset(clusterCommunicator); 184 reset(clusterCommunicator);
184 - expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class))) 185 + clusterCommunicator.<InternalDeviceEvent>broadcast(
185 - .andReturn(true).anyTimes(); 186 + anyObject(InternalDeviceEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
187 + expectLastCall().anyTimes();
186 replay(clusterCommunicator); 188 replay(clusterCommunicator);
187 deviceStore.createOrUpdateDevice(PID, deviceId, description); 189 deviceStore.createOrUpdateDevice(PID, deviceId, description);
188 verify(clusterCommunicator); 190 verify(clusterCommunicator);
...@@ -299,16 +301,18 @@ public class GossipDeviceStoreTest { ...@@ -299,16 +301,18 @@ public class GossipDeviceStoreTest {
299 DeviceId deviceId, 301 DeviceId deviceId,
300 ProviderId providerId, 302 ProviderId providerId,
301 DeviceDescription expectedDesc, 303 DeviceDescription expectedDesc,
302 - Capture<ClusterMessage> actualMsg) { 304 + Capture<InternalDeviceEvent> actualEvent,
303 - assertTrue(actualMsg.hasCaptured()); 305 + Capture<MessageSubject> actualSubject,
304 - assertEquals(sender, actualMsg.getValue().sender()); 306 + Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
307 + assertTrue(actualEvent.hasCaptured());
308 + assertTrue(actualSubject.hasCaptured());
309 + assertTrue(actualEncoder.hasCaptured());
310 +
305 assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, 311 assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
306 - actualMsg.getValue().subject()); 312 + actualSubject.getValue());
307 - InternalDeviceEvent addEvent 313 + assertEquals(deviceId, actualEvent.getValue().deviceId());
308 - = testGossipDeviceStore.deserialize(actualMsg.getValue().payload()); 314 + assertEquals(providerId, actualEvent.getValue().providerId());
309 - assertEquals(deviceId, addEvent.deviceId()); 315 + assertDeviceDescriptionEquals(expectedDesc, actualEvent.getValue().deviceDescription().value());
310 - assertEquals(providerId, addEvent.providerId());
311 - assertDeviceDescriptionEquals(expectedDesc, addEvent.deviceDescription().value());
312 } 316 }
313 317
314 private void assertInternalDeviceEvent(NodeId sender, 318 private void assertInternalDeviceEvent(NodeId sender,
...@@ -316,16 +320,21 @@ public class GossipDeviceStoreTest { ...@@ -316,16 +320,21 @@ public class GossipDeviceStoreTest {
316 ProviderId providerId, 320 ProviderId providerId,
317 DeviceDescription expectedDesc, 321 DeviceDescription expectedDesc,
318 List<SparseAnnotations> expectedAnnotations, 322 List<SparseAnnotations> expectedAnnotations,
319 - Capture<ClusterMessage> actualMsg) { 323 + Capture<InternalDeviceEvent> actualEvent,
320 - assertTrue(actualMsg.hasCaptured()); 324 + Capture<MessageSubject> actualSubject,
321 - assertEquals(sender, actualMsg.getValue().sender()); 325 + Capture<Function<InternalDeviceEvent, byte[]>> actualEncoder) {
326 + assertTrue(actualEvent.hasCaptured());
327 + assertTrue(actualSubject.hasCaptured());
328 + assertTrue(actualEncoder.hasCaptured());
329 +
322 assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, 330 assertEquals(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
323 - actualMsg.getValue().subject()); 331 + actualSubject.getValue());
324 - InternalDeviceEvent addEvent 332 + assertEquals(deviceId, actualEvent.getValue().deviceId());
325 - = testGossipDeviceStore.deserialize(actualMsg.getValue().payload()); 333 + assertEquals(providerId, actualEvent.getValue().providerId());
326 - assertEquals(deviceId, addEvent.deviceId()); 334 + assertDeviceDescriptionEquals(
327 - assertEquals(providerId, addEvent.providerId()); 335 + expectedDesc,
328 - assertDeviceDescriptionEquals(expectedDesc, expectedAnnotations, addEvent.deviceDescription().value()); 336 + expectedAnnotations,
337 + actualEvent.getValue().deviceDescription().value());
329 } 338 }
330 339
331 @Test 340 @Test
...@@ -333,26 +342,28 @@ public class GossipDeviceStoreTest { ...@@ -333,26 +342,28 @@ public class GossipDeviceStoreTest {
333 DeviceDescription description = 342 DeviceDescription description =
334 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, 343 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
335 HW, SW1, SN, CID); 344 HW, SW1, SN, CID);
336 - Capture<ClusterMessage> bcast = new Capture<>(); 345 + Capture<InternalDeviceEvent> message = new Capture<>();
346 + Capture<MessageSubject> subject = new Capture<>();
347 + Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
337 348
338 - resetCommunicatorExpectingSingleBroadcast(bcast); 349 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
339 DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description); 350 DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
340 assertEquals(DEVICE_ADDED, event.type()); 351 assertEquals(DEVICE_ADDED, event.type());
341 assertDevice(DID1, SW1, event.subject()); 352 assertDevice(DID1, SW1, event.subject());
342 verify(clusterCommunicator); 353 verify(clusterCommunicator);
343 - assertInternalDeviceEvent(NID1, DID1, PID, description, bcast); 354 + assertInternalDeviceEvent(NID1, DID1, PID, description, message, subject, encoder);
344 355
345 356
346 DeviceDescription description2 = 357 DeviceDescription description2 =
347 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, 358 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
348 HW, SW2, SN, CID); 359 HW, SW2, SN, CID);
349 - resetCommunicatorExpectingSingleBroadcast(bcast); 360 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
350 DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2); 361 DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
351 assertEquals(DEVICE_UPDATED, event2.type()); 362 assertEquals(DEVICE_UPDATED, event2.type());
352 assertDevice(DID1, SW2, event2.subject()); 363 assertDevice(DID1, SW2, event2.subject());
353 364
354 verify(clusterCommunicator); 365 verify(clusterCommunicator);
355 - assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast); 366 + assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
356 reset(clusterCommunicator); 367 reset(clusterCommunicator);
357 368
358 assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2)); 369 assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
...@@ -366,7 +377,11 @@ public class GossipDeviceStoreTest { ...@@ -366,7 +377,11 @@ public class GossipDeviceStoreTest {
366 HW, SW1, SN, CID, A2); 377 HW, SW1, SN, CID, A2);
367 Capture<ClusterMessage> bcast = new Capture<>(); 378 Capture<ClusterMessage> bcast = new Capture<>();
368 379
369 - resetCommunicatorExpectingSingleBroadcast(bcast); 380 + Capture<InternalDeviceEvent> message = new Capture<>();
381 + Capture<MessageSubject> subject = new Capture<>();
382 + Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
383 +
384 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
370 DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description); 385 DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
371 assertEquals(DEVICE_ADDED, event.type()); 386 assertEquals(DEVICE_ADDED, event.type());
372 assertDevice(DID1, SW1, event.subject()); 387 assertDevice(DID1, SW1, event.subject());
...@@ -374,13 +389,13 @@ public class GossipDeviceStoreTest { ...@@ -374,13 +389,13 @@ public class GossipDeviceStoreTest {
374 assertAnnotationsEquals(event.subject().annotations(), A2); 389 assertAnnotationsEquals(event.subject().annotations(), A2);
375 assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1)); 390 assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
376 verify(clusterCommunicator); 391 verify(clusterCommunicator);
377 - assertInternalDeviceEvent(NID1, DID1, PIDA, description, bcast); 392 + assertInternalDeviceEvent(NID1, DID1, PIDA, description, message, subject, encoder);
378 393
379 // update from primary 394 // update from primary
380 DeviceDescription description2 = 395 DeviceDescription description2 =
381 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, 396 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
382 HW, SW2, SN, CID, A1); 397 HW, SW2, SN, CID, A1);
383 - resetCommunicatorExpectingSingleBroadcast(bcast); 398 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
384 399
385 DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2); 400 DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
386 assertEquals(DEVICE_UPDATED, event2.type()); 401 assertEquals(DEVICE_UPDATED, event2.type());
...@@ -389,17 +404,17 @@ public class GossipDeviceStoreTest { ...@@ -389,17 +404,17 @@ public class GossipDeviceStoreTest {
389 assertAnnotationsEquals(event2.subject().annotations(), A1, A2); 404 assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
390 assertTrue(deviceStore.isAvailable(DID1)); 405 assertTrue(deviceStore.isAvailable(DID1));
391 verify(clusterCommunicator); 406 verify(clusterCommunicator);
392 - assertInternalDeviceEvent(NID1, DID1, PID, description2, bcast); 407 + assertInternalDeviceEvent(NID1, DID1, PID, description2, message, subject, encoder);
393 408
394 // no-op update from primary 409 // no-op update from primary
395 - resetCommunicatorExpectingNoBroadcast(bcast); 410 + resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
396 assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2)); 411 assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
397 412
398 verify(clusterCommunicator); 413 verify(clusterCommunicator);
399 assertFalse("no broadcast expected", bcast.hasCaptured()); 414 assertFalse("no broadcast expected", bcast.hasCaptured());
400 415
401 // For now, Ancillary is ignored once primary appears 416 // For now, Ancillary is ignored once primary appears
402 - resetCommunicatorExpectingNoBroadcast(bcast); 417 + resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
403 418
404 assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description)); 419 assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
405 420
...@@ -410,7 +425,7 @@ public class GossipDeviceStoreTest { ...@@ -410,7 +425,7 @@ public class GossipDeviceStoreTest {
410 DeviceDescription description3 = 425 DeviceDescription description3 =
411 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR, 426 new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
412 HW, SW1, SN, CID, A2_2); 427 HW, SW1, SN, CID, A2_2);
413 - resetCommunicatorExpectingSingleBroadcast(bcast); 428 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
414 429
415 DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3); 430 DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
416 assertEquals(DEVICE_UPDATED, event3.type()); 431 assertEquals(DEVICE_UPDATED, event3.type());
...@@ -423,7 +438,7 @@ public class GossipDeviceStoreTest { ...@@ -423,7 +438,7 @@ public class GossipDeviceStoreTest {
423 verify(clusterCommunicator); 438 verify(clusterCommunicator);
424 // note: only annotation from PIDA is sent over the wire 439 // note: only annotation from PIDA is sent over the wire
425 assertInternalDeviceEvent(NID1, DID1, PIDA, description3, 440 assertInternalDeviceEvent(NID1, DID1, PIDA, description3,
426 - asList(union(A2, A2_2)), bcast); 441 + asList(union(A2, A2_2)), message, subject, encoder);
427 442
428 } 443 }
429 444
...@@ -434,23 +449,25 @@ public class GossipDeviceStoreTest { ...@@ -434,23 +449,25 @@ public class GossipDeviceStoreTest {
434 putDevice(DID1, SW1); 449 putDevice(DID1, SW1);
435 assertTrue(deviceStore.isAvailable(DID1)); 450 assertTrue(deviceStore.isAvailable(DID1));
436 451
437 - Capture<ClusterMessage> bcast = new Capture<>(); 452 + Capture<InternalDeviceEvent> message = new Capture<>();
453 + Capture<MessageSubject> subject = new Capture<>();
454 + Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
438 455
439 - resetCommunicatorExpectingSingleBroadcast(bcast); 456 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
440 DeviceEvent event = deviceStore.markOffline(DID1); 457 DeviceEvent event = deviceStore.markOffline(DID1);
441 assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type()); 458 assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
442 assertDevice(DID1, SW1, event.subject()); 459 assertDevice(DID1, SW1, event.subject());
443 assertFalse(deviceStore.isAvailable(DID1)); 460 assertFalse(deviceStore.isAvailable(DID1));
444 verify(clusterCommunicator); 461 verify(clusterCommunicator);
445 // TODO: verify broadcast message 462 // TODO: verify broadcast message
446 - assertTrue(bcast.hasCaptured()); 463 + assertTrue(message.hasCaptured());
447 464
448 465
449 - resetCommunicatorExpectingNoBroadcast(bcast); 466 + resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
450 DeviceEvent event2 = deviceStore.markOffline(DID1); 467 DeviceEvent event2 = deviceStore.markOffline(DID1);
451 assertNull("No change, no event", event2); 468 assertNull("No change, no event", event2);
452 verify(clusterCommunicator); 469 verify(clusterCommunicator);
453 - assertFalse(bcast.hasCaptured()); 470 + assertFalse(message.hasCaptured());
454 } 471 }
455 472
456 @Test 473 @Test
...@@ -460,13 +477,15 @@ public class GossipDeviceStoreTest { ...@@ -460,13 +477,15 @@ public class GossipDeviceStoreTest {
460 new DefaultPortDescription(P1, true), 477 new DefaultPortDescription(P1, true),
461 new DefaultPortDescription(P2, true) 478 new DefaultPortDescription(P2, true)
462 ); 479 );
463 - Capture<ClusterMessage> bcast = new Capture<>(); 480 + Capture<InternalDeviceEvent> message = new Capture<>();
481 + Capture<MessageSubject> subject = new Capture<>();
482 + Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
464 483
465 - resetCommunicatorExpectingSingleBroadcast(bcast); 484 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
466 List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds); 485 List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
467 verify(clusterCommunicator); 486 verify(clusterCommunicator);
468 // TODO: verify broadcast message 487 // TODO: verify broadcast message
469 - assertTrue(bcast.hasCaptured()); 488 + assertTrue(message.hasCaptured());
470 489
471 Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2); 490 Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
472 for (DeviceEvent event : events) { 491 for (DeviceEvent event : events) {
...@@ -485,11 +504,11 @@ public class GossipDeviceStoreTest { ...@@ -485,11 +504,11 @@ public class GossipDeviceStoreTest {
485 new DefaultPortDescription(P3, true) 504 new DefaultPortDescription(P3, true)
486 ); 505 );
487 506
488 - resetCommunicatorExpectingSingleBroadcast(bcast); 507 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
489 events = deviceStore.updatePorts(PID, DID1, pds2); 508 events = deviceStore.updatePorts(PID, DID1, pds2);
490 verify(clusterCommunicator); 509 verify(clusterCommunicator);
491 // TODO: verify broadcast message 510 // TODO: verify broadcast message
492 - assertTrue(bcast.hasCaptured()); 511 + assertTrue(message.hasCaptured());
493 512
494 assertFalse("event should be triggered", events.isEmpty()); 513 assertFalse("event should be triggered", events.isEmpty());
495 for (DeviceEvent event : events) { 514 for (DeviceEvent event : events) {
...@@ -513,11 +532,11 @@ public class GossipDeviceStoreTest { ...@@ -513,11 +532,11 @@ public class GossipDeviceStoreTest {
513 new DefaultPortDescription(P1, false), 532 new DefaultPortDescription(P1, false),
514 new DefaultPortDescription(P2, true) 533 new DefaultPortDescription(P2, true)
515 ); 534 );
516 - resetCommunicatorExpectingSingleBroadcast(bcast); 535 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
517 events = deviceStore.updatePorts(PID, DID1, pds3); 536 events = deviceStore.updatePorts(PID, DID1, pds3);
518 verify(clusterCommunicator); 537 verify(clusterCommunicator);
519 // TODO: verify broadcast message 538 // TODO: verify broadcast message
520 - assertTrue(bcast.hasCaptured()); 539 + assertTrue(message.hasCaptured());
521 540
522 assertFalse("event should be triggered", events.isEmpty()); 541 assertFalse("event should be triggered", events.isEmpty());
523 for (DeviceEvent event : events) { 542 for (DeviceEvent event : events) {
...@@ -544,9 +563,11 @@ public class GossipDeviceStoreTest { ...@@ -544,9 +563,11 @@ public class GossipDeviceStoreTest {
544 ); 563 );
545 deviceStore.updatePorts(PID, DID1, pds); 564 deviceStore.updatePorts(PID, DID1, pds);
546 565
547 - Capture<ClusterMessage> bcast = new Capture<>(); 566 + Capture<InternalPortStatusEvent> message = new Capture<>();
567 + Capture<MessageSubject> subject = new Capture<>();
568 + Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
548 569
549 - resetCommunicatorExpectingSingleBroadcast(bcast); 570 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
550 final DefaultPortDescription desc = new DefaultPortDescription(P1, false); 571 final DefaultPortDescription desc = new DefaultPortDescription(P1, false);
551 DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc); 572 DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc);
552 assertEquals(PORT_UPDATED, event.type()); 573 assertEquals(PORT_UPDATED, event.type());
...@@ -554,8 +575,8 @@ public class GossipDeviceStoreTest { ...@@ -554,8 +575,8 @@ public class GossipDeviceStoreTest {
554 assertEquals(P1, event.port().number()); 575 assertEquals(P1, event.port().number());
555 assertFalse("Port is disabled", event.port().isEnabled()); 576 assertFalse("Port is disabled", event.port().isEnabled());
556 verify(clusterCommunicator); 577 verify(clusterCommunicator);
557 - assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, bcast); 578 + assertInternalPortStatusEvent(NID1, DID1, PID, desc, NO_ANNOTATION, message, subject, encoder);
558 - assertTrue(bcast.hasCaptured()); 579 + assertTrue(message.hasCaptured());
559 } 580 }
560 581
561 @Test 582 @Test
...@@ -567,11 +588,13 @@ public class GossipDeviceStoreTest { ...@@ -567,11 +588,13 @@ public class GossipDeviceStoreTest {
567 ); 588 );
568 deviceStore.updatePorts(PID, DID1, pds); 589 deviceStore.updatePorts(PID, DID1, pds);
569 590
570 - Capture<ClusterMessage> bcast = new Capture<>(); 591 + Capture<InternalPortStatusEvent> message = new Capture<>();
571 - 592 + Capture<MessageSubject> subject = new Capture<>();
593 + Capture<Function<InternalPortStatusEvent, byte[]>> encoder = new Capture<>();
572 594
573 // update port from primary 595 // update port from primary
574 - resetCommunicatorExpectingSingleBroadcast(bcast); 596 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
597 +
575 final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2); 598 final DefaultPortDescription desc1 = new DefaultPortDescription(P1, false, A1_2);
576 DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1); 599 DeviceEvent event = deviceStore.updatePortStatus(PID, DID1, desc1);
577 assertEquals(PORT_UPDATED, event.type()); 600 assertEquals(PORT_UPDATED, event.type());
...@@ -580,19 +603,19 @@ public class GossipDeviceStoreTest { ...@@ -580,19 +603,19 @@ public class GossipDeviceStoreTest {
580 assertAnnotationsEquals(event.port().annotations(), A1, A1_2); 603 assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
581 assertFalse("Port is disabled", event.port().isEnabled()); 604 assertFalse("Port is disabled", event.port().isEnabled());
582 verify(clusterCommunicator); 605 verify(clusterCommunicator);
583 - assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), bcast); 606 + assertInternalPortStatusEvent(NID1, DID1, PID, desc1, asList(A1, A1_2), message, subject, encoder);
584 - assertTrue(bcast.hasCaptured()); 607 + assertTrue(message.hasCaptured());
585 608
586 // update port from ancillary with no attributes 609 // update port from ancillary with no attributes
587 - resetCommunicatorExpectingNoBroadcast(bcast); 610 + resetCommunicatorExpectingNoBroadcast(message, subject, encoder);
588 final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true); 611 final DefaultPortDescription desc2 = new DefaultPortDescription(P1, true);
589 DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2); 612 DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1, desc2);
590 assertNull("Ancillary is ignored if primary exists", event2); 613 assertNull("Ancillary is ignored if primary exists", event2);
591 verify(clusterCommunicator); 614 verify(clusterCommunicator);
592 - assertFalse(bcast.hasCaptured()); 615 + assertFalse(message.hasCaptured());
593 616
594 // but, Ancillary annotation update will be notified 617 // but, Ancillary annotation update will be notified
595 - resetCommunicatorExpectingSingleBroadcast(bcast); 618 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
596 final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2); 619 final DefaultPortDescription desc3 = new DefaultPortDescription(P1, true, A2);
597 DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3); 620 DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1, desc3);
598 assertEquals(PORT_UPDATED, event3.type()); 621 assertEquals(PORT_UPDATED, event3.type());
...@@ -601,11 +624,11 @@ public class GossipDeviceStoreTest { ...@@ -601,11 +624,11 @@ public class GossipDeviceStoreTest {
601 assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2); 624 assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
602 assertFalse("Port is disabled", event3.port().isEnabled()); 625 assertFalse("Port is disabled", event3.port().isEnabled());
603 verify(clusterCommunicator); 626 verify(clusterCommunicator);
604 - assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), bcast); 627 + assertInternalPortStatusEvent(NID1, DID1, PIDA, desc3, asList(A2), message, subject, encoder);
605 - assertTrue(bcast.hasCaptured()); 628 + assertTrue(message.hasCaptured());
606 629
607 // port only reported from Ancillary will be notified as down 630 // port only reported from Ancillary will be notified as down
608 - resetCommunicatorExpectingSingleBroadcast(bcast); 631 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
609 final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true); 632 final DefaultPortDescription desc4 = new DefaultPortDescription(P2, true);
610 DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4); 633 DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1, desc4);
611 assertEquals(PORT_ADDED, event4.type()); 634 assertEquals(PORT_ADDED, event4.type());
...@@ -616,25 +639,29 @@ public class GossipDeviceStoreTest { ...@@ -616,25 +639,29 @@ public class GossipDeviceStoreTest {
616 event4.port().isEnabled()); 639 event4.port().isEnabled());
617 verify(clusterCommunicator); 640 verify(clusterCommunicator);
618 // TODO: verify broadcast message content 641 // TODO: verify broadcast message content
619 - assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, bcast); 642 + assertInternalPortStatusEvent(NID1, DID1, PIDA, desc4, NO_ANNOTATION, message, subject, encoder);
620 - assertTrue(bcast.hasCaptured()); 643 + assertTrue(message.hasCaptured());
621 } 644 }
622 645
623 - private void assertInternalPortStatusEvent(NodeId sender, DeviceId did, 646 + private void assertInternalPortStatusEvent(NodeId sender,
624 - ProviderId pid, DefaultPortDescription expectedDesc, 647 + DeviceId did,
625 - List<SparseAnnotations> expectedAnnotations, Capture<ClusterMessage> actualMsg) { 648 + ProviderId pid,
649 + DefaultPortDescription expectedDesc,
650 + List<SparseAnnotations> expectedAnnotations,
651 + Capture<InternalPortStatusEvent> actualEvent,
652 + Capture<MessageSubject> actualSubject,
653 + Capture<Function<InternalPortStatusEvent, byte[]>> actualEncoder) {
654 +
655 + assertTrue(actualEvent.hasCaptured());
656 + assertTrue(actualSubject.hasCaptured());
657 + assertTrue(actualEncoder.hasCaptured());
626 658
627 - assertTrue(actualMsg.hasCaptured());
628 - assertEquals(sender, actualMsg.getValue().sender());
629 assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, 659 assertEquals(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
630 - actualMsg.getValue().subject()); 660 + actualSubject.getValue());
631 - InternalPortStatusEvent addEvent 661 + assertEquals(did, actualEvent.getValue().deviceId());
632 - = testGossipDeviceStore.deserialize(actualMsg.getValue().payload()); 662 + assertEquals(pid, actualEvent.getValue().providerId());
633 - assertEquals(did, addEvent.deviceId());
634 - assertEquals(pid, addEvent.providerId());
635 assertPortDescriptionEquals(expectedDesc, expectedAnnotations, 663 assertPortDescriptionEquals(expectedDesc, expectedAnnotations,
636 - addEvent.portDescription().value()); 664 + actualEvent.getValue().portDescription().value());
637 -
638 } 665 }
639 666
640 private void assertPortDescriptionEquals( 667 private void assertPortDescriptionEquals(
...@@ -649,19 +676,31 @@ public class GossipDeviceStoreTest { ...@@ -649,19 +676,31 @@ public class GossipDeviceStoreTest {
649 expectedAnnotations.toArray(new SparseAnnotations[0])); 676 expectedAnnotations.toArray(new SparseAnnotations[0]));
650 } 677 }
651 678
652 - private void resetCommunicatorExpectingNoBroadcast( 679 + private <T> void resetCommunicatorExpectingNoBroadcast(
653 - Capture<ClusterMessage> bcast) { 680 + Capture<T> message,
654 - bcast.reset(); 681 + Capture<MessageSubject> subject,
682 + Capture<Function<T, byte[]>> encoder) {
683 + message.reset();
684 + subject.reset();
685 + encoder.reset();
655 reset(clusterCommunicator); 686 reset(clusterCommunicator);
656 replay(clusterCommunicator); 687 replay(clusterCommunicator);
657 } 688 }
658 689
659 - private void resetCommunicatorExpectingSingleBroadcast( 690 + private <T> void resetCommunicatorExpectingSingleBroadcast(
660 - Capture<ClusterMessage> bcast) { 691 + Capture<T> message,
692 + Capture<MessageSubject> subject,
693 + Capture<Function<T, byte[]>> encoder) {
661 694
662 - bcast.reset(); 695 + message.reset();
696 + subject.reset();
697 + encoder.reset();
663 reset(clusterCommunicator); 698 reset(clusterCommunicator);
664 - expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once(); 699 + clusterCommunicator.broadcast(
700 + capture(message),
701 + capture(subject),
702 + capture(encoder));
703 + expectLastCall().once();
665 replay(clusterCommunicator); 704 replay(clusterCommunicator);
666 } 705 }
667 706
...@@ -724,9 +763,11 @@ public class GossipDeviceStoreTest { ...@@ -724,9 +763,11 @@ public class GossipDeviceStoreTest {
724 assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1); 763 assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
725 assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2); 764 assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
726 765
727 - Capture<ClusterMessage> bcast = new Capture<>(); 766 + Capture<InternalDeviceEvent> message = new Capture<>();
767 + Capture<MessageSubject> subject = new Capture<>();
768 + Capture<Function<InternalDeviceEvent, byte[]>> encoder = new Capture<>();
728 769
729 - resetCommunicatorExpectingSingleBroadcast(bcast); 770 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
730 771
731 DeviceEvent event = deviceStore.removeDevice(DID1); 772 DeviceEvent event = deviceStore.removeDevice(DID1);
732 assertEquals(DEVICE_REMOVED, event.type()); 773 assertEquals(DEVICE_REMOVED, event.type());
...@@ -736,7 +777,7 @@ public class GossipDeviceStoreTest { ...@@ -736,7 +777,7 @@ public class GossipDeviceStoreTest {
736 assertEquals(0, deviceStore.getPorts(DID1).size()); 777 assertEquals(0, deviceStore.getPorts(DID1).size());
737 verify(clusterCommunicator); 778 verify(clusterCommunicator);
738 // TODO: verify broadcast message 779 // TODO: verify broadcast message
739 - assertTrue(bcast.hasCaptured()); 780 + assertTrue(message.hasCaptured());
740 781
741 // putBack Device, Port w/o annotation 782 // putBack Device, Port w/o annotation
742 putDevice(DID1, SW1); 783 putDevice(DID1, SW1);
...@@ -825,10 +866,6 @@ public class GossipDeviceStoreTest { ...@@ -825,10 +866,6 @@ public class GossipDeviceStoreTest {
825 this.clusterService = clusterService; 866 this.clusterService = clusterService;
826 this.clusterCommunicator = clusterCommunicator; 867 this.clusterCommunicator = clusterCommunicator;
827 } 868 }
828 -
829 - public <T> T deserialize(byte[] bytes) {
830 - return SERIALIZER.decode(bytes);
831 - }
832 } 869 }
833 870
834 private static final class TestClusterService extends StaticClusterService { 871 private static final class TestClusterService extends StaticClusterService {
......
...@@ -30,6 +30,7 @@ import org.onosproject.cluster.ClusterService; ...@@ -30,6 +30,7 @@ import org.onosproject.cluster.ClusterService;
30 import org.onosproject.cluster.ControllerNode; 30 import org.onosproject.cluster.ControllerNode;
31 import org.onosproject.cluster.DefaultControllerNode; 31 import org.onosproject.cluster.DefaultControllerNode;
32 import org.onosproject.cluster.NodeId; 32 import org.onosproject.cluster.NodeId;
33 +import org.onosproject.event.AbstractEvent;
33 import org.onosproject.store.Timestamp; 34 import org.onosproject.store.Timestamp;
34 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 35 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
35 import org.onosproject.store.cluster.messaging.ClusterMessage; 36 import org.onosproject.store.cluster.messaging.ClusterMessage;
...@@ -44,17 +45,20 @@ import org.onosproject.store.service.EventuallyConsistentMap; ...@@ -44,17 +45,20 @@ import org.onosproject.store.service.EventuallyConsistentMap;
44 import org.onosproject.store.service.EventuallyConsistentMapEvent; 45 import org.onosproject.store.service.EventuallyConsistentMapEvent;
45 import org.onosproject.store.service.EventuallyConsistentMapListener; 46 import org.onosproject.store.service.EventuallyConsistentMapListener;
46 47
47 -import java.io.IOException;
48 import java.util.ArrayList; 48 import java.util.ArrayList;
49 import java.util.HashMap; 49 import java.util.HashMap;
50 import java.util.HashSet; 50 import java.util.HashSet;
51 +import java.util.List;
51 import java.util.Map; 52 import java.util.Map;
52 import java.util.Objects; 53 import java.util.Objects;
53 import java.util.Set; 54 import java.util.Set;
55 +import java.util.concurrent.CompletableFuture;
54 import java.util.concurrent.CountDownLatch; 56 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.ExecutorService; 57 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.TimeUnit; 58 import java.util.concurrent.TimeUnit;
57 import java.util.concurrent.atomic.AtomicLong; 59 import java.util.concurrent.atomic.AtomicLong;
60 +import java.util.function.Consumer;
61 +import java.util.function.Function;
58 62
59 import static com.google.common.base.Preconditions.checkArgument; 63 import static com.google.common.base.Preconditions.checkArgument;
60 import static junit.framework.TestCase.assertFalse; 64 import static junit.framework.TestCase.assertFalse;
...@@ -281,7 +285,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -281,7 +285,7 @@ public class EventuallyConsistentMapImplTest {
281 285
282 // Set up expected internal message to be broadcast to peers on first put 286 // Set up expected internal message to be broadcast to peers on first put
283 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService 287 expectSpecificMulticastMessage(generatePutMessage(KEY1, VALUE1, clockService
284 - .peekAtNextTimestamp()), clusterCommunicator); 288 + .peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
285 289
286 // Put first value 290 // Put first value
287 assertNull(ecMap.get(KEY1)); 291 assertNull(ecMap.get(KEY1));
...@@ -292,7 +296,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -292,7 +296,7 @@ public class EventuallyConsistentMapImplTest {
292 296
293 // Set up expected internal message to be broadcast to peers on second put 297 // Set up expected internal message to be broadcast to peers on second put
294 expectSpecificMulticastMessage(generatePutMessage( 298 expectSpecificMulticastMessage(generatePutMessage(
295 - KEY1, VALUE2, clockService.peekAtNextTimestamp()), clusterCommunicator); 299 + KEY1, VALUE2, clockService.peekAtNextTimestamp()), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
296 300
297 // Update same key to a new value 301 // Update same key to a new value
298 ecMap.put(KEY1, VALUE2); 302 ecMap.put(KEY1, VALUE2);
...@@ -341,7 +345,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -341,7 +345,7 @@ public class EventuallyConsistentMapImplTest {
341 // Remove the value and check the correct internal cluster messages 345 // Remove the value and check the correct internal cluster messages
342 // are sent 346 // are sent
343 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), 347 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
344 - clusterCommunicator); 348 + UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
345 349
346 ecMap.remove(KEY1); 350 ecMap.remove(KEY1);
347 assertNull(ecMap.get(KEY1)); 351 assertNull(ecMap.get(KEY1));
...@@ -352,7 +356,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -352,7 +356,7 @@ public class EventuallyConsistentMapImplTest {
352 // the map, we expect that the tombstone is updated and another remove 356 // the map, we expect that the tombstone is updated and another remove
353 // event is sent to the cluster and external listeners. 357 // event is sent to the cluster and external listeners.
354 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), 358 expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()),
355 - clusterCommunicator); 359 + UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
356 360
357 ecMap.remove(KEY1); 361 ecMap.remove(KEY1);
358 assertNull(ecMap.get(KEY1)); 362 assertNull(ecMap.get(KEY1));
...@@ -402,7 +406,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -402,7 +406,7 @@ public class EventuallyConsistentMapImplTest {
402 ecMap.addListener(listener); 406 ecMap.addListener(listener);
403 407
404 // Expect a multi-update inter-instance message 408 // Expect a multi-update inter-instance message
405 - expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), 409 + expectSpecificBroadcastMessage(generatePutMessage(KEY1, VALUE1, KEY2, VALUE2), UPDATE_MESSAGE_SUBJECT,
406 clusterCommunicator); 410 clusterCommunicator);
407 411
408 Map<String, String> putAllValues = new HashMap<>(); 412 Map<String, String> putAllValues = new HashMap<>();
...@@ -441,7 +445,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -441,7 +445,7 @@ public class EventuallyConsistentMapImplTest {
441 ecMap.put(KEY2, VALUE2); 445 ecMap.put(KEY2, VALUE2);
442 446
443 ecMap.addListener(listener); 447 ecMap.addListener(listener);
444 - expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), clusterCommunicator); 448 + expectSpecificBroadcastMessage(generateRemoveMessage(KEY1, KEY2), UPDATE_MESSAGE_SUBJECT, clusterCommunicator);
445 449
446 ecMap.clear(); 450 ecMap.clear();
447 451
...@@ -605,7 +609,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -605,7 +609,8 @@ public class EventuallyConsistentMapImplTest {
605 SERIALIZER.encode(Lists.newArrayList(event))); 609 SERIALIZER.encode(Lists.newArrayList(event)));
606 } 610 }
607 611
608 - private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) { 612 + private List<PutEntry<String, String>> generatePutMessage(
613 + String key1, String value1, String key2, String value2) {
609 ArrayList<PutEntry<String, String>> list = new ArrayList<>(); 614 ArrayList<PutEntry<String, String>> list = new ArrayList<>();
610 615
611 Timestamp timestamp1 = clockService.peek(1); 616 Timestamp timestamp1 = clockService.peek(1);
...@@ -617,10 +622,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -617,10 +622,7 @@ public class EventuallyConsistentMapImplTest {
617 list.add(pe1); 622 list.add(pe1);
618 list.add(pe2); 623 list.add(pe2);
619 624
620 - 625 + return list;
621 - return new ClusterMessage(
622 - clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
623 - SERIALIZER.encode(list));
624 } 626 }
625 627
626 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) { 628 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
...@@ -631,7 +633,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -631,7 +633,7 @@ public class EventuallyConsistentMapImplTest {
631 SERIALIZER.encode(Lists.newArrayList(event))); 633 SERIALIZER.encode(Lists.newArrayList(event)));
632 } 634 }
633 635
634 - private ClusterMessage generateRemoveMessage(String key1, String key2) { 636 + private List<RemoveEntry<String, String>> generateRemoveMessage(String key1, String key2) {
635 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>(); 637 ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
636 638
637 Timestamp timestamp1 = clockService.peek(1); 639 Timestamp timestamp1 = clockService.peek(1);
...@@ -643,9 +645,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -643,9 +645,7 @@ public class EventuallyConsistentMapImplTest {
643 list.add(re1); 645 list.add(re1);
644 list.add(re2); 646 list.add(re2);
645 647
646 - return new ClusterMessage( 648 + return list;
647 - clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
648 - SERIALIZER.encode(list));
649 } 649 }
650 650
651 /** 651 /**
...@@ -656,13 +656,13 @@ public class EventuallyConsistentMapImplTest { ...@@ -656,13 +656,13 @@ public class EventuallyConsistentMapImplTest {
656 * @param clusterCommunicator a mock ClusterCommunicationService to set up 656 * @param clusterCommunicator a mock ClusterCommunicationService to set up
657 */ 657 */
658 //FIXME rename 658 //FIXME rename
659 - private static void expectSpecificBroadcastMessage(ClusterMessage m, 659 + private static <T> void expectSpecificBroadcastMessage(
660 + T message,
661 + MessageSubject subject,
660 ClusterCommunicationService clusterCommunicator) { 662 ClusterCommunicationService clusterCommunicator) {
661 reset(clusterCommunicator); 663 reset(clusterCommunicator);
662 -// expect(clusterCommunicator.broadcast(m)).andReturn(true); 664 + clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
663 - expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class))) 665 + expectLastCall().anyTimes();
664 - .andReturn(true)
665 - .anyTimes();
666 replay(clusterCommunicator); 666 replay(clusterCommunicator);
667 } 667 }
668 668
...@@ -670,17 +670,16 @@ public class EventuallyConsistentMapImplTest { ...@@ -670,17 +670,16 @@ public class EventuallyConsistentMapImplTest {
670 * Sets up a mock ClusterCommunicationService to expect a specific cluster 670 * Sets up a mock ClusterCommunicationService to expect a specific cluster
671 * message to be multicast to the cluster. 671 * message to be multicast to the cluster.
672 * 672 *
673 - * @param m message we expect to be sent 673 + * @param message message we expect to be sent
674 + * @param subject subject we expect to be sent to
674 * @param clusterCommunicator a mock ClusterCommunicationService to set up 675 * @param clusterCommunicator a mock ClusterCommunicationService to set up
675 */ 676 */
676 //FIXME rename 677 //FIXME rename
677 - private static void expectSpecificMulticastMessage(ClusterMessage m, 678 + private static <T> void expectSpecificMulticastMessage(T message, MessageSubject subject,
678 ClusterCommunicationService clusterCommunicator) { 679 ClusterCommunicationService clusterCommunicator) {
679 reset(clusterCommunicator); 680 reset(clusterCommunicator);
680 -// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true); 681 + clusterCommunicator.<T>multicast(eq(message), eq(subject), anyObject(Function.class), anyObject(Set.class));
681 - expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class))) 682 + expectLastCall().anyTimes();
682 - .andReturn(true)
683 - .anyTimes();
684 replay(clusterCommunicator); 683 replay(clusterCommunicator);
685 } 684 }
686 685
...@@ -693,11 +692,14 @@ public class EventuallyConsistentMapImplTest { ...@@ -693,11 +692,14 @@ public class EventuallyConsistentMapImplTest {
693 * @param clusterCommunicator a mock ClusterCommunicationService to set up 692 * @param clusterCommunicator a mock ClusterCommunicationService to set up
694 */ 693 */
695 //FIXME rename 694 //FIXME rename
696 - private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) { 695 + private <T> void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
697 reset(clusterCommunicator); 696 reset(clusterCommunicator);
698 // expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class), 697 // expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
699 // anyObject(Iterable.class))) 698 // anyObject(Iterable.class)))
700 - expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), 699 + expect(clusterCommunicator.<T>unicast(
700 + anyObject(),
701 + anyObject(MessageSubject.class),
702 + anyObject(Function.class),
701 anyObject(NodeId.class))) 703 anyObject(NodeId.class)))
702 .andReturn(true) 704 .andReturn(true)
703 .anyTimes(); 705 .anyTimes();
...@@ -711,15 +713,14 @@ public class EventuallyConsistentMapImplTest { ...@@ -711,15 +713,14 @@ public class EventuallyConsistentMapImplTest {
711 * 713 *
712 * @param clusterCommunicator a mock ClusterCommunicationService to set up 714 * @param clusterCommunicator a mock ClusterCommunicationService to set up
713 */ 715 */
714 - //FIXME rename
715 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) { 716 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
716 reset(clusterCommunicator); 717 reset(clusterCommunicator);
717 -// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class))) 718 + clusterCommunicator.<AbstractEvent>multicast(
718 -// .andReturn(true) 719 + anyObject(AbstractEvent.class),
719 -// .anyTimes(); 720 + anyObject(MessageSubject.class),
720 - expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class))) 721 + anyObject(Function.class),
721 - .andReturn(true) 722 + anyObject(Set.class));
722 - .anyTimes(); 723 + expectLastCall().anyTimes();
723 replay(clusterCommunicator); 724 replay(clusterCommunicator);
724 } 725 }
725 726
...@@ -733,59 +734,87 @@ public class EventuallyConsistentMapImplTest { ...@@ -733,59 +734,87 @@ public class EventuallyConsistentMapImplTest {
733 implements ClusterCommunicationService { 734 implements ClusterCommunicationService {
734 735
735 @Override 736 @Override
736 - public boolean broadcast(ClusterMessage message) { 737 + public void addSubscriber(MessageSubject subject,
737 - return false; 738 + ClusterMessageHandler subscriber,
739 + ExecutorService executor) {
740 + if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
741 + updateHandler = subscriber;
742 + } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
743 + antiEntropyHandler = subscriber;
744 + } else {
745 + throw new RuntimeException("Unexpected message subject " + subject.toString());
746 + }
738 } 747 }
739 748
740 @Override 749 @Override
741 - public boolean broadcastIncludeSelf(ClusterMessage message) { 750 + public void removeSubscriber(MessageSubject subject) {}
742 - return false; 751 +
752 + @Override
753 + public <M> void broadcast(M message, MessageSubject subject,
754 + Function<M, byte[]> encoder) {
743 } 755 }
744 756
745 @Override 757 @Override
746 - public boolean unicast(ClusterMessage message, NodeId toNodeId) { 758 + public <M> void broadcastIncludeSelf(M message,
747 - return false; 759 + MessageSubject subject, Function<M, byte[]> encoder) {
748 } 760 }
749 761
750 @Override 762 @Override
751 - public boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds) { 763 + public <M> boolean unicast(M message, MessageSubject subject,
764 + Function<M, byte[]> encoder, NodeId toNodeId) {
752 return false; 765 return false;
753 } 766 }
754 767
755 @Override 768 @Override
756 - public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, 769 + public <M> void multicast(M message, MessageSubject subject,
757 - NodeId toNodeId) 770 + Function<M, byte[]> encoder, Set<NodeId> nodes) {
758 - throws IOException {
759 - return null;
760 } 771 }
761 772
762 @Override 773 @Override
763 - public void addSubscriber(MessageSubject subject, 774 + public <M, R> CompletableFuture<R> sendAndReceive(M message,
764 - ClusterMessageHandler subscriber) { 775 + MessageSubject subject, Function<M, byte[]> encoder,
765 - if (subject.equals(UPDATE_MESSAGE_SUBJECT)) { 776 + Function<byte[], R> decoder, NodeId toNodeId) {
766 - updateHandler = subscriber; 777 + return null;
767 - } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
768 - antiEntropyHandler = subscriber;
769 - } else {
770 - throw new RuntimeException("Unexpected message subject " + subject.toString());
771 } 778 }
779 +
780 + @Override
781 + public <M, R> void addSubscriber(MessageSubject subject,
782 + Function<byte[], M> decoder, Function<M, R> handler,
783 + Function<R, byte[]> encoder, ExecutorService executor) {
772 } 784 }
773 785
774 @Override 786 @Override
775 - public void addSubscriber(MessageSubject subject, 787 + public <M> void addSubscriber(MessageSubject subject,
776 - ClusterMessageHandler subscriber, 788 + Function<byte[], M> decoder, Consumer<M> handler,
777 ExecutorService executor) { 789 ExecutorService executor) {
778 - if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
779 - updateHandler = subscriber;
780 - } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
781 - antiEntropyHandler = subscriber;
782 - } else {
783 - throw new RuntimeException("Unexpected message subject " + subject.toString());
784 } 790 }
791 +
792 + @Override
793 + public boolean broadcast(ClusterMessage message) {
794 + return false;
795 + }
796 +
797 + @Override
798 + public boolean broadcastIncludeSelf(ClusterMessage message) {
799 + return false;
785 } 800 }
786 801
787 @Override 802 @Override
788 - public void removeSubscriber(MessageSubject subject) {} 803 + public boolean unicast(ClusterMessage message, NodeId toNodeId) {
804 + return false;
805 + }
806 +
807 + @Override
808 + public boolean multicast(ClusterMessage message,
809 + Iterable<NodeId> nodeIds) {
810 + return false;
811 + }
812 +
813 + @Override
814 + public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
815 + NodeId toNodeId) {
816 + return null;
817 + }
789 } 818 }
790 819
791 /** 820 /**
......
...@@ -28,7 +28,6 @@ import org.onlab.packet.IpAddress; ...@@ -28,7 +28,6 @@ import org.onlab.packet.IpAddress;
28 import org.onosproject.cluster.ControllerNode; 28 import org.onosproject.cluster.ControllerNode;
29 import org.onosproject.cluster.DefaultControllerNode; 29 import org.onosproject.cluster.DefaultControllerNode;
30 import org.onosproject.cluster.NodeId; 30 import org.onosproject.cluster.NodeId;
31 -import org.onosproject.mastership.MastershipService;
32 import org.onosproject.mastership.MastershipServiceAdapter; 31 import org.onosproject.mastership.MastershipServiceAdapter;
33 import org.onosproject.mastership.MastershipTerm; 32 import org.onosproject.mastership.MastershipTerm;
34 import org.onosproject.net.ConnectPoint; 33 import org.onosproject.net.ConnectPoint;
...@@ -48,7 +47,6 @@ import org.onosproject.net.link.LinkStoreDelegate; ...@@ -48,7 +47,6 @@ import org.onosproject.net.link.LinkStoreDelegate;
48 import org.onosproject.net.provider.ProviderId; 47 import org.onosproject.net.provider.ProviderId;
49 import org.onosproject.store.cluster.StaticClusterService; 48 import org.onosproject.store.cluster.StaticClusterService;
50 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 49 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
51 -import org.onosproject.store.cluster.messaging.ClusterMessage;
52 import org.onosproject.store.cluster.messaging.ClusterMessageHandler; 50 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
53 import org.onosproject.store.cluster.messaging.MessageSubject; 51 import org.onosproject.store.cluster.messaging.MessageSubject;
54 import org.onosproject.store.device.impl.DeviceClockManager; 52 import org.onosproject.store.device.impl.DeviceClockManager;
...@@ -59,6 +57,7 @@ import java.util.Set; ...@@ -59,6 +57,7 @@ import java.util.Set;
59 import java.util.concurrent.CountDownLatch; 57 import java.util.concurrent.CountDownLatch;
60 import java.util.concurrent.ExecutorService; 58 import java.util.concurrent.ExecutorService;
61 import java.util.concurrent.TimeUnit; 59 import java.util.concurrent.TimeUnit;
60 +import java.util.function.Function;
62 61
63 import static org.easymock.EasyMock.*; 62 import static org.easymock.EasyMock.*;
64 import static org.junit.Assert.*; 63 import static org.junit.Assert.*;
...@@ -119,7 +118,6 @@ public class GossipLinkStoreTest { ...@@ -119,7 +118,6 @@ public class GossipLinkStoreTest {
119 private DeviceClockManager deviceClockManager; 118 private DeviceClockManager deviceClockManager;
120 private DeviceClockService deviceClockService; 119 private DeviceClockService deviceClockService;
121 private ClusterCommunicationService clusterCommunicator; 120 private ClusterCommunicationService clusterCommunicator;
122 - private MastershipService mastershipService;
123 121
124 @BeforeClass 122 @BeforeClass
125 public static void setUpBeforeClass() throws Exception { 123 public static void setUpBeforeClass() throws Exception {
...@@ -171,26 +169,24 @@ public class GossipLinkStoreTest { ...@@ -171,26 +169,24 @@ public class GossipLinkStoreTest {
171 ConnectPoint src = new ConnectPoint(srcId, srcNum); 169 ConnectPoint src = new ConnectPoint(srcId, srcNum);
172 ConnectPoint dst = new ConnectPoint(dstId, dstNum); 170 ConnectPoint dst = new ConnectPoint(dstId, dstNum);
173 reset(clusterCommunicator); 171 reset(clusterCommunicator);
174 - expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class))) 172 + clusterCommunicator.<InternalLinkEvent>broadcast(
175 - .andReturn(true).anyTimes(); 173 + anyObject(InternalLinkEvent.class), anyObject(MessageSubject.class), anyObject(Function.class));
174 + expectLastCall().anyTimes();
176 replay(clusterCommunicator); 175 replay(clusterCommunicator);
177 linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations)); 176 linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
178 verify(clusterCommunicator); 177 verify(clusterCommunicator);
179 } 178 }
180 179
181 - private void resetCommunicatorExpectingNoBroadcast( 180 + private <T> void resetCommunicatorExpectingSingleBroadcast(
182 - Capture<ClusterMessage> bcast) { 181 + Capture<T> message,
183 - bcast.reset(); 182 + Capture<MessageSubject> subject,
183 + Capture<Function<T, byte[]>> encoder) {
184 + message.reset();
185 + subject.reset();
186 + encoder.reset();
184 reset(clusterCommunicator); 187 reset(clusterCommunicator);
185 - replay(clusterCommunicator); 188 + clusterCommunicator.broadcast(capture(message), capture(subject), capture(encoder));
186 - } 189 + expectLastCall().once();
187 -
188 - private void resetCommunicatorExpectingSingleBroadcast(
189 - Capture<ClusterMessage> bcast) {
190 -
191 - bcast.reset();
192 - reset(clusterCommunicator);
193 - expect(clusterCommunicator.broadcast(capture(bcast))).andReturn(true).once();
194 replay(clusterCommunicator); 190 replay(clusterCommunicator);
195 } 191 }
196 192
...@@ -367,38 +363,40 @@ public class GossipLinkStoreTest { ...@@ -367,38 +363,40 @@ public class GossipLinkStoreTest {
367 ConnectPoint src = new ConnectPoint(DID1, P1); 363 ConnectPoint src = new ConnectPoint(DID1, P1);
368 ConnectPoint dst = new ConnectPoint(DID2, P2); 364 ConnectPoint dst = new ConnectPoint(DID2, P2);
369 365
370 - Capture<ClusterMessage> bcast = new Capture<>(); 366 + Capture<InternalLinkEvent> message = new Capture<>();
367 + Capture<MessageSubject> subject = new Capture<>();
368 + Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
371 369
372 // add link 370 // add link
373 - resetCommunicatorExpectingSingleBroadcast(bcast); 371 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
374 final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT); 372 final DefaultLinkDescription linkDescription = new DefaultLinkDescription(src, dst, INDIRECT);
375 LinkEvent event = linkStore.createOrUpdateLink(PID, 373 LinkEvent event = linkStore.createOrUpdateLink(PID,
376 linkDescription); 374 linkDescription);
377 - verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast); 375 + verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
378 376
379 assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject()); 377 assertLink(DID1, P1, DID2, P2, INDIRECT, event.subject());
380 assertEquals(LINK_ADDED, event.type()); 378 assertEquals(LINK_ADDED, event.type());
381 379
382 // update link type 380 // update link type
383 - resetCommunicatorExpectingSingleBroadcast(bcast); 381 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
384 LinkEvent event2 = linkStore.createOrUpdateLink(PID, 382 LinkEvent event2 = linkStore.createOrUpdateLink(PID,
385 new DefaultLinkDescription(src, dst, DIRECT)); 383 new DefaultLinkDescription(src, dst, DIRECT));
386 - verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast); 384 + verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
387 385
388 assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject()); 386 assertLink(DID1, P1, DID2, P2, DIRECT, event2.subject());
389 assertEquals(LINK_UPDATED, event2.type()); 387 assertEquals(LINK_UPDATED, event2.type());
390 388
391 // no change 389 // no change
392 - resetCommunicatorExpectingSingleBroadcast(bcast); 390 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
393 LinkEvent event3 = linkStore.createOrUpdateLink(PID, 391 LinkEvent event3 = linkStore.createOrUpdateLink(PID,
394 new DefaultLinkDescription(src, dst, DIRECT)); 392 new DefaultLinkDescription(src, dst, DIRECT));
395 - verifyNoBroadcastMessage(bcast); 393 + verifyNoBroadcastMessage(message);
396 394
397 assertNull("No change event expected", event3); 395 assertNull("No change event expected", event3);
398 } 396 }
399 397
400 - private void verifyNoBroadcastMessage(Capture<ClusterMessage> bcast) { 398 + private <T> void verifyNoBroadcastMessage(Capture<T> message) {
401 - assertFalse("No broadcast expected", bcast.hasCaptured()); 399 + assertFalse("No broadcast expected", message.hasCaptured());
402 } 400 }
403 401
404 private void verifyLinkBroadcastMessage(ProviderId providerId, 402 private void verifyLinkBroadcastMessage(ProviderId providerId,
...@@ -406,17 +404,14 @@ public class GossipLinkStoreTest { ...@@ -406,17 +404,14 @@ public class GossipLinkStoreTest {
406 ConnectPoint src, 404 ConnectPoint src,
407 ConnectPoint dst, 405 ConnectPoint dst,
408 Type type, 406 Type type,
409 - Capture<ClusterMessage> actualMsg) { 407 + Capture<InternalLinkEvent> actualLinkEvent,
408 + Capture<MessageSubject> actualSubject,
409 + Capture<Function<InternalLinkEvent, byte[]>> actualEncoder) {
410 verify(clusterCommunicator); 410 verify(clusterCommunicator);
411 - assertTrue(actualMsg.hasCaptured()); 411 + assertTrue(actualLinkEvent.hasCaptured());
412 - assertEquals(sender, actualMsg.getValue().sender()); 412 + assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, actualSubject.getValue());
413 - assertEquals(GossipLinkStoreMessageSubjects.LINK_UPDATE, 413 + assertEquals(providerId, actualLinkEvent.getValue().providerId());
414 - actualMsg.getValue().subject()); 414 + assertLinkDescriptionEquals(src, dst, type, actualLinkEvent.getValue().linkDescription().value());
415 - InternalLinkEvent linkEvent
416 - = GossipLinkStore.SERIALIZER.decode(actualMsg.getValue().payload());
417 - assertEquals(providerId, linkEvent.providerId());
418 - assertLinkDescriptionEquals(src, dst, type, linkEvent.linkDescription().value());
419 -
420 } 415 }
421 416
422 private static void assertLinkDescriptionEquals(ConnectPoint src, 417 private static void assertLinkDescriptionEquals(ConnectPoint src,
...@@ -434,31 +429,33 @@ public class GossipLinkStoreTest { ...@@ -434,31 +429,33 @@ public class GossipLinkStoreTest {
434 ConnectPoint src = new ConnectPoint(DID1, P1); 429 ConnectPoint src = new ConnectPoint(DID1, P1);
435 ConnectPoint dst = new ConnectPoint(DID2, P2); 430 ConnectPoint dst = new ConnectPoint(DID2, P2);
436 431
437 - Capture<ClusterMessage> bcast = new Capture<>(); 432 + Capture<InternalLinkEvent> message = new Capture<>();
433 + Capture<MessageSubject> subject = new Capture<>();
434 + Capture<Function<InternalLinkEvent, byte[]>> encoder = new Capture<>();
438 435
439 // add Ancillary link 436 // add Ancillary link
440 - resetCommunicatorExpectingSingleBroadcast(bcast); 437 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
441 LinkEvent event = linkStore.createOrUpdateLink(PIDA, 438 LinkEvent event = linkStore.createOrUpdateLink(PIDA,
442 new DefaultLinkDescription(src, dst, INDIRECT, A1)); 439 new DefaultLinkDescription(src, dst, INDIRECT, A1));
443 - verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, bcast); 440 + verifyLinkBroadcastMessage(PIDA, NID1, src, dst, INDIRECT, message, subject, encoder);
444 441
445 assertNotNull("Ancillary only link is ignored", event); 442 assertNotNull("Ancillary only link is ignored", event);
446 443
447 // add Primary link 444 // add Primary link
448 - resetCommunicatorExpectingSingleBroadcast(bcast); 445 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
449 LinkEvent event2 = linkStore.createOrUpdateLink(PID, 446 LinkEvent event2 = linkStore.createOrUpdateLink(PID,
450 new DefaultLinkDescription(src, dst, INDIRECT, A2)); 447 new DefaultLinkDescription(src, dst, INDIRECT, A2));
451 - verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, bcast); 448 + verifyLinkBroadcastMessage(PID, NID1, src, dst, INDIRECT, message, subject, encoder);
452 449
453 assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject()); 450 assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
454 assertAnnotationsEquals(event2.subject().annotations(), A2, A1); 451 assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
455 assertEquals(LINK_UPDATED, event2.type()); 452 assertEquals(LINK_UPDATED, event2.type());
456 453
457 // update link type 454 // update link type
458 - resetCommunicatorExpectingSingleBroadcast(bcast); 455 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
459 LinkEvent event3 = linkStore.createOrUpdateLink(PID, 456 LinkEvent event3 = linkStore.createOrUpdateLink(PID,
460 new DefaultLinkDescription(src, dst, DIRECT, A2)); 457 new DefaultLinkDescription(src, dst, DIRECT, A2));
461 - verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast); 458 + verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
462 459
463 assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject()); 460 assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
464 assertAnnotationsEquals(event3.subject().annotations(), A2, A1); 461 assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
...@@ -466,38 +463,38 @@ public class GossipLinkStoreTest { ...@@ -466,38 +463,38 @@ public class GossipLinkStoreTest {
466 463
467 464
468 // no change 465 // no change
469 - resetCommunicatorExpectingNoBroadcast(bcast); 466 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
470 LinkEvent event4 = linkStore.createOrUpdateLink(PID, 467 LinkEvent event4 = linkStore.createOrUpdateLink(PID,
471 new DefaultLinkDescription(src, dst, DIRECT)); 468 new DefaultLinkDescription(src, dst, DIRECT));
472 - verifyNoBroadcastMessage(bcast); 469 + verifyNoBroadcastMessage(message);
473 470
474 assertNull("No change event expected", event4); 471 assertNull("No change event expected", event4);
475 472
476 // update link annotation (Primary) 473 // update link annotation (Primary)
477 - resetCommunicatorExpectingSingleBroadcast(bcast); 474 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
478 LinkEvent event5 = linkStore.createOrUpdateLink(PID, 475 LinkEvent event5 = linkStore.createOrUpdateLink(PID,
479 new DefaultLinkDescription(src, dst, DIRECT, A2_2)); 476 new DefaultLinkDescription(src, dst, DIRECT, A2_2));
480 - verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, bcast); 477 + verifyLinkBroadcastMessage(PID, NID1, src, dst, DIRECT, message, subject, encoder);
481 478
482 assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject()); 479 assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
483 assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1); 480 assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
484 assertEquals(LINK_UPDATED, event5.type()); 481 assertEquals(LINK_UPDATED, event5.type());
485 482
486 // update link annotation (Ancillary) 483 // update link annotation (Ancillary)
487 - resetCommunicatorExpectingSingleBroadcast(bcast); 484 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
488 LinkEvent event6 = linkStore.createOrUpdateLink(PIDA, 485 LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
489 new DefaultLinkDescription(src, dst, DIRECT, A1_2)); 486 new DefaultLinkDescription(src, dst, DIRECT, A1_2));
490 - verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, bcast); 487 + verifyLinkBroadcastMessage(PIDA, NID1, src, dst, DIRECT, message, subject, encoder);
491 488
492 assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject()); 489 assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
493 assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2); 490 assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
494 assertEquals(LINK_UPDATED, event6.type()); 491 assertEquals(LINK_UPDATED, event6.type());
495 492
496 // update link type (Ancillary) : ignored 493 // update link type (Ancillary) : ignored
497 - resetCommunicatorExpectingNoBroadcast(bcast); 494 + resetCommunicatorExpectingSingleBroadcast(message, subject, encoder);
498 LinkEvent event7 = linkStore.createOrUpdateLink(PIDA, 495 LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
499 new DefaultLinkDescription(src, dst, EDGE)); 496 new DefaultLinkDescription(src, dst, EDGE));
500 - verifyNoBroadcastMessage(bcast); 497 + verifyNoBroadcastMessage(message);
501 assertNull("Ancillary change other than annotation is ignored", event7); 498 assertNull("Ancillary change other than annotation is ignored", event7);
502 } 499 }
503 500
......
1 -/*
2 - * Copyright 2014 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -package org.onosproject.store.serializers;
17 -
18 -import static com.google.common.base.Preconditions.checkNotNull;
19 -
20 -import com.google.common.base.Function;
21 -
22 -/**
23 - * Function to convert byte[] into {@code T}.
24 - *
25 - * @param <T> Type after decoding
26 - */
27 -public final class DecodeTo<T> implements Function<byte[], T> {
28 -
29 - private StoreSerializer serializer;
30 -
31 - public DecodeTo(StoreSerializer serializer) {
32 - this.serializer = checkNotNull(serializer);
33 - }
34 -
35 - @Override
36 - public T apply(byte[] input) {
37 - return serializer.decode(input);
38 - }
39 -}
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
15 */ 15 */
16 package org.onlab.util; 16 package org.onlab.util;
17 17
18 -import com.google.common.base.Strings; 18 +import static java.nio.file.Files.delete;
19 -import com.google.common.primitives.UnsignedLongs; 19 +import static java.nio.file.Files.walkFileTree;
20 -import com.google.common.util.concurrent.ThreadFactoryBuilder; 20 +import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
21 -import org.slf4j.Logger; 21 +import static org.slf4j.LoggerFactory.getLogger;
22 22
23 import java.io.BufferedReader; 23 import java.io.BufferedReader;
24 import java.io.File; 24 import java.io.File;
...@@ -37,12 +37,17 @@ import java.util.ArrayList; ...@@ -37,12 +37,17 @@ import java.util.ArrayList;
37 import java.util.Collection; 37 import java.util.Collection;
38 import java.util.Dictionary; 38 import java.util.Dictionary;
39 import java.util.List; 39 import java.util.List;
40 +import java.util.concurrent.ExecutionException;
41 +import java.util.concurrent.Future;
40 import java.util.concurrent.ThreadFactory; 42 import java.util.concurrent.ThreadFactory;
43 +import java.util.concurrent.TimeUnit;
44 +import java.util.concurrent.TimeoutException;
41 45
42 -import static java.nio.file.Files.delete; 46 +import org.slf4j.Logger;
43 -import static java.nio.file.Files.walkFileTree; 47 +
44 -import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory; 48 +import com.google.common.base.Strings;
45 -import static org.slf4j.LoggerFactory.getLogger; 49 +import com.google.common.primitives.UnsignedLongs;
50 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
46 51
47 /** 52 /**
48 * Miscellaneous utility methods. 53 * Miscellaneous utility methods.
...@@ -324,6 +329,51 @@ public abstract class Tools { ...@@ -324,6 +329,51 @@ public abstract class Tools {
324 dst.getAbsolutePath())); 329 dst.getAbsolutePath()));
325 } 330 }
326 331
332 + /**
333 + * Returns the future value when complete or if future
334 + * completes exceptionally returns the defaultValue.
335 + * @param future future
336 + * @param defaultValue default value
337 + * @param <T> future value type
338 + * @return future value when complete or if future
339 + * completes exceptionally returns the defaultValue.
340 + */
341 + public static <T> T futureGetOrElse(Future<T> future, T defaultValue) {
342 + try {
343 + return future.get();
344 + } catch (InterruptedException e) {
345 + Thread.currentThread().interrupt();
346 + return defaultValue;
347 + } catch (ExecutionException e) {
348 + return defaultValue;
349 + }
350 + }
351 +
352 + /**
353 + * Returns the future value when complete or if future
354 + * completes exceptionally returns the defaultValue.
355 + * @param future future
356 + * @param timeout time to wait for successful completion
357 + * @param timeUnit time unit
358 + * @param defaultValue default value
359 + * @param <T> future value type
360 + * @return future value when complete or if future
361 + * completes exceptionally returns the defaultValue.
362 + */
363 + public static <T> T futureGetOrElse(Future<T> future,
364 + long timeout,
365 + TimeUnit timeUnit,
366 + T defaultValue) {
367 + try {
368 + return future.get(timeout, timeUnit);
369 + } catch (InterruptedException e) {
370 + Thread.currentThread().interrupt();
371 + return defaultValue;
372 + } catch (ExecutionException | TimeoutException e) {
373 + return defaultValue;
374 + }
375 + }
376 +
327 // Auxiliary path visitor for recursive directory structure copying. 377 // Auxiliary path visitor for recursive directory structure copying.
328 private static class DirectoryCopier extends SimpleFileVisitor<Path> { 378 private static class DirectoryCopier extends SimpleFileVisitor<Path> {
329 private Path src; 379 private Path src;
......
...@@ -16,10 +16,9 @@ ...@@ -16,10 +16,9 @@
16 package org.onlab.netty; 16 package org.onlab.netty;
17 17
18 import java.io.IOException; 18 import java.io.IOException;
19 +import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ExecutorService; 20 import java.util.concurrent.ExecutorService;
20 21
21 -import com.google.common.util.concurrent.ListenableFuture;
22 -
23 /** 22 /**
24 * Interface for low level messaging primitives. 23 * Interface for low level messaging primitives.
25 */ 24 */
...@@ -40,9 +39,8 @@ public interface MessagingService { ...@@ -40,9 +39,8 @@ public interface MessagingService {
40 * @param type type of message. 39 * @param type type of message.
41 * @param payload message payload. 40 * @param payload message payload.
42 * @return a response future 41 * @return a response future
43 - * @throws IOException when I/O exception of some sort has occurred
44 */ 42 */
45 - public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException; 43 + public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload);
46 44
47 /** 45 /**
48 * Registers a new message handler for message type. 46 * Registers a new message handler for message type.
......
...@@ -39,6 +39,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; ...@@ -39,6 +39,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
39 import java.io.IOException; 39 import java.io.IOException;
40 import java.net.InetAddress; 40 import java.net.InetAddress;
41 import java.net.UnknownHostException; 41 import java.net.UnknownHostException;
42 +import java.util.concurrent.CompletableFuture;
42 import java.util.concurrent.ConcurrentHashMap; 43 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap; 44 import java.util.concurrent.ConcurrentMap;
44 import java.util.concurrent.ExecutorService; 45 import java.util.concurrent.ExecutorService;
...@@ -56,8 +57,6 @@ import com.google.common.cache.Cache; ...@@ -56,8 +57,6 @@ import com.google.common.cache.Cache;
56 import com.google.common.cache.CacheBuilder; 57 import com.google.common.cache.CacheBuilder;
57 import com.google.common.cache.RemovalListener; 58 import com.google.common.cache.RemovalListener;
58 import com.google.common.cache.RemovalNotification; 59 import com.google.common.cache.RemovalNotification;
59 -import com.google.common.util.concurrent.ListenableFuture;
60 -import com.google.common.util.concurrent.SettableFuture;
61 60
62 /** 61 /**
63 * A Netty based implementation of MessagingService. 62 * A Netty based implementation of MessagingService.
...@@ -69,14 +68,14 @@ public class NettyMessagingService implements MessagingService { ...@@ -69,14 +68,14 @@ public class NettyMessagingService implements MessagingService {
69 private final Endpoint localEp; 68 private final Endpoint localEp;
70 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); 69 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
71 private final AtomicLong messageIdGenerator = new AtomicLong(0); 70 private final AtomicLong messageIdGenerator = new AtomicLong(0);
72 - private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() 71 + private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
73 .maximumSize(100000) 72 .maximumSize(100000)
74 .expireAfterWrite(10, TimeUnit.SECONDS) 73 .expireAfterWrite(10, TimeUnit.SECONDS)
75 - .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() { 74 + .removalListener(new RemovalListener<Long, CompletableFuture<byte[]>>() {
76 @Override 75 @Override
77 - public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) { 76 + public void onRemoval(RemovalNotification<Long, CompletableFuture<byte[]>> entry) {
78 if (entry.wasEvicted()) { 77 if (entry.wasEvicted()) {
79 - entry.getValue().setException(new TimeoutException("Timedout waiting for reply")); 78 + entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
80 } 79 }
81 } 80 }
82 }) 81 })
...@@ -178,11 +177,10 @@ public class NettyMessagingService implements MessagingService { ...@@ -178,11 +177,10 @@ public class NettyMessagingService implements MessagingService {
178 } 177 }
179 178
180 @Override 179 @Override
181 - public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) 180 + public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
182 - throws IOException { 181 + CompletableFuture<byte[]> response = new CompletableFuture<>();
183 - SettableFuture<byte[]> futureResponse = SettableFuture.create();
184 Long messageId = messageIdGenerator.incrementAndGet(); 182 Long messageId = messageIdGenerator.incrementAndGet();
185 - responseFutures.put(messageId, futureResponse); 183 + responseFutures.put(messageId, response);
186 InternalMessage message = new InternalMessage.Builder(this) 184 InternalMessage message = new InternalMessage.Builder(this)
187 .withId(messageId) 185 .withId(messageId)
188 .withSender(localEp) 186 .withSender(localEp)
...@@ -193,9 +191,9 @@ public class NettyMessagingService implements MessagingService { ...@@ -193,9 +191,9 @@ public class NettyMessagingService implements MessagingService {
193 sendAsync(ep, message); 191 sendAsync(ep, message);
194 } catch (Exception e) { 192 } catch (Exception e) {
195 responseFutures.invalidate(messageId); 193 responseFutures.invalidate(messageId);
196 - throw e; 194 + response.completeExceptionally(e);
197 } 195 }
198 - return futureResponse; 196 + return response;
199 } 197 }
200 198
201 @Override 199 @Override
...@@ -333,10 +331,10 @@ public class NettyMessagingService implements MessagingService { ...@@ -333,10 +331,10 @@ public class NettyMessagingService implements MessagingService {
333 String type = message.type(); 331 String type = message.type();
334 if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) { 332 if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
335 try { 333 try {
336 - SettableFuture<byte[]> futureResponse = 334 + CompletableFuture<byte[]> futureResponse =
337 NettyMessagingService.this.responseFutures.getIfPresent(message.id()); 335 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
338 if (futureResponse != null) { 336 if (futureResponse != null) {
339 - futureResponse.set(message.payload()); 337 + futureResponse.complete(message.payload());
340 } else { 338 } else {
341 log.warn("Received a reply for message id:[{}]. " 339 log.warn("Received a reply for message id:[{}]. "
342 + " from {}. But was unable to locate the" 340 + " from {}. But was unable to locate the"
......