Brian O'Connor
Committed by Ray Milkey

adding sender-side accumulator to ecmap

Change-Id: I63de27131c067c07b41ca311b14ef3ac85b6ae3e
...@@ -15,51 +15,55 @@ ...@@ -15,51 +15,55 @@
15 */ 15 */
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 -import com.google.common.collect.ImmutableList;
19 import org.onosproject.store.Timestamp; 18 import org.onosproject.store.Timestamp;
20 19
21 -import java.util.List;
22 -
23 import static com.google.common.base.Preconditions.checkNotNull; 20 import static com.google.common.base.Preconditions.checkNotNull;
24 21
25 /** 22 /**
26 - * Internal inter-instance event used by EventuallyConsistentMap for PUT events. 23 + * Base class for events in an EventuallyConsistentMap.
27 */ 24 */
28 -final class InternalPutEvent<K, V> { 25 +public abstract class AbstractEntry<K, V> implements Comparable<AbstractEntry<K, V>> {
29 - private final List<PutEntry<K, V>> entries; 26 + private final K key;
27 + private final Timestamp timestamp;
30 28
31 /** 29 /**
32 - * Creates a put event for a single key. 30 + * Creates a new put entry.
33 * 31 *
34 - * @param key key the event concerns 32 + * @param key key of the entry
35 - * @param value value of the key 33 + * @param timestamp timestamp of the put event
36 - * @param timestamp timestamp of the event
37 */ 34 */
38 - public InternalPutEvent(K key, V value, Timestamp timestamp) { 35 + public AbstractEntry(K key, Timestamp timestamp) {
39 - entries = ImmutableList.of(new PutEntry<>(key, value, timestamp)); 36 + this.key = checkNotNull(key);
37 + this.timestamp = checkNotNull(timestamp);
38 + }
39 +
40 + // Needed for serialization.
41 + @SuppressWarnings("unused")
42 + protected AbstractEntry() {
43 + this.key = null;
44 + this.timestamp = null;
40 } 45 }
41 46
42 /** 47 /**
43 - * Creates a put event for multiple keys. 48 + * Returns the key of the entry.
44 * 49 *
45 - * @param entries list of put entries to send an event for 50 + * @return the key
46 */ 51 */
47 - public InternalPutEvent(List<PutEntry<K, V>> entries) { 52 + public K key() {
48 - this.entries = checkNotNull(entries); 53 + return key;
49 - }
50 -
51 - // Needed for serialization.
52 - @SuppressWarnings("unused")
53 - private InternalPutEvent() {
54 - entries = null;
55 } 54 }
56 55
57 /** 56 /**
58 - * Returns the list of put entries this event concerns. 57 + * Returns the timestamp of the event.
59 * 58 *
60 - * @return list of put entries 59 + * @return the timestamp
61 */ 60 */
62 - public List<PutEntry<K, V>> entries() { 61 + public Timestamp timestamp() {
63 - return entries; 62 + return timestamp;
63 + }
64 +
65 + @Override
66 + public int compareTo(AbstractEntry<K, V> o) {
67 + return this.timestamp.compareTo(o.timestamp);
64 } 68 }
65 } 69 }
......
...@@ -15,9 +15,13 @@ ...@@ -15,9 +15,13 @@
15 */ 15 */
16 package org.onosproject.store.ecmap; 16 package org.onosproject.store.ecmap;
17 17
18 +import com.google.common.collect.ImmutableList;
19 +import com.google.common.collect.Lists;
20 +import com.google.common.collect.Maps;
18 import org.apache.commons.lang3.RandomUtils; 21 import org.apache.commons.lang3.RandomUtils;
19 import org.apache.commons.lang3.mutable.MutableBoolean; 22 import org.apache.commons.lang3.mutable.MutableBoolean;
20 import org.apache.commons.lang3.tuple.Pair; 23 import org.apache.commons.lang3.tuple.Pair;
24 +import org.onlab.util.AbstractAccumulator;
21 import org.onlab.util.KryoNamespace; 25 import org.onlab.util.KryoNamespace;
22 import org.onlab.util.SlidingWindowCounter; 26 import org.onlab.util.SlidingWindowCounter;
23 import org.onosproject.cluster.ClusterService; 27 import org.onosproject.cluster.ClusterService;
...@@ -42,6 +46,7 @@ import java.util.LinkedList; ...@@ -42,6 +46,7 @@ import java.util.LinkedList;
42 import java.util.List; 46 import java.util.List;
43 import java.util.Map; 47 import java.util.Map;
44 import java.util.Set; 48 import java.util.Set;
49 +import java.util.Timer;
45 import java.util.concurrent.ConcurrentHashMap; 50 import java.util.concurrent.ConcurrentHashMap;
46 import java.util.concurrent.ConcurrentMap; 51 import java.util.concurrent.ConcurrentMap;
47 import java.util.concurrent.CopyOnWriteArraySet; 52 import java.util.concurrent.CopyOnWriteArraySet;
...@@ -78,7 +83,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -78,7 +83,6 @@ public class EventuallyConsistentMapImpl<K, V>
78 private final ClockService<K, V> clockService; 83 private final ClockService<K, V> clockService;
79 84
80 private final MessageSubject updateMessageSubject; 85 private final MessageSubject updateMessageSubject;
81 - private final MessageSubject removeMessageSubject;
82 private final MessageSubject antiEntropyAdvertisementSubject; 86 private final MessageSubject antiEntropyAdvertisementSubject;
83 87
84 private final Set<EventuallyConsistentMapListener<K, V>> listeners 88 private final Set<EventuallyConsistentMapListener<K, V>> listeners
...@@ -87,9 +91,10 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -87,9 +91,10 @@ public class EventuallyConsistentMapImpl<K, V>
87 private final ExecutorService executor; 91 private final ExecutorService executor;
88 92
89 private final ScheduledExecutorService backgroundExecutor; 93 private final ScheduledExecutorService backgroundExecutor;
90 - private final BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction; 94 + private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
91 95
92 - private ExecutorService broadcastMessageExecutor; 96 + private ExecutorService communicationExecutor;
97 + private Map<NodeId, EventAccumulator> senderPending;
93 98
94 private volatile boolean destroyed = false; 99 private volatile boolean destroyed = false;
95 private static final String ERROR_DESTROYED = " map is already destroyed"; 100 private static final String ERROR_DESTROYED = " map is already destroyed";
...@@ -149,7 +154,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -149,7 +154,7 @@ public class EventuallyConsistentMapImpl<K, V>
149 ClusterCommunicationService clusterCommunicator, 154 ClusterCommunicationService clusterCommunicator,
150 KryoNamespace.Builder serializerBuilder, 155 KryoNamespace.Builder serializerBuilder,
151 ClockService<K, V> clockService, 156 ClockService<K, V> clockService,
152 - BiFunction<K, V, Iterable<NodeId>> peerUpdateFunction) { 157 + BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
153 this.clusterService = checkNotNull(clusterService); 158 this.clusterService = checkNotNull(clusterService);
154 this.clusterCommunicator = checkNotNull(clusterCommunicator); 159 this.clusterCommunicator = checkNotNull(clusterCommunicator);
155 this.peerUpdateFunction = checkNotNull(peerUpdateFunction); 160 this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
...@@ -168,27 +173,23 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -168,27 +173,23 @@ public class EventuallyConsistentMapImpl<K, V>
168 173
169 // sending executor; should be capped 174 // sending executor; should be capped
170 //TODO make # of threads configurable 175 //TODO make # of threads configurable
171 - broadcastMessageExecutor = //newSingleThreadExecutor(groupedThreads("onos/ecm", mapName + "-notify")); 176 + //TODO this probably doesn't need to be bounded anymore
172 - newFixedThreadPool(4, groupedThreads("onos/ecm", mapName + "-notify")); 177 + communicationExecutor =
178 + newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
179 + senderPending = Maps.newConcurrentMap();
173 180
174 backgroundExecutor = 181 backgroundExecutor =
175 - //FIXME anti-entropy can take >60 seconds and it blocks fg workers 182 + newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
176 - // ... dropping minPriority to try to help until this can be parallel
177 - newSingleThreadScheduledExecutor(//minPriority(
178 - groupedThreads("onos/ecm", mapName + "-bg-%d"))/*)*/;
179 183
180 // start anti-entropy thread 184 // start anti-entropy thread
181 - //TODO disable anti-entropy for now in testing (it is unstable)
182 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), 185 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
183 initialDelaySec, periodSec, 186 initialDelaySec, periodSec,
184 TimeUnit.SECONDS); 187 TimeUnit.SECONDS);
185 188
186 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update"); 189 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
187 clusterCommunicator.addSubscriber(updateMessageSubject, 190 clusterCommunicator.addSubscriber(updateMessageSubject,
188 - new InternalPutEventListener(), executor); 191 + new InternalEventListener(), executor);
189 - removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove"); 192 +
190 - clusterCommunicator.addSubscriber(removeMessageSubject,
191 - new InternalRemoveEventListener(), executor);
192 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy"); 193 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
193 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject, 194 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
194 new InternalAntiEntropyListener(), backgroundExecutor); 195 new InternalAntiEntropyListener(), backgroundExecutor);
...@@ -232,8 +233,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -232,8 +233,6 @@ public class EventuallyConsistentMapImpl<K, V>
232 .register(PutEntry.class) 233 .register(PutEntry.class)
233 .register(RemoveEntry.class) 234 .register(RemoveEntry.class)
234 .register(ArrayList.class) 235 .register(ArrayList.class)
235 - .register(InternalPutEvent.class)
236 - .register(InternalRemoveEvent.class)
237 .register(AntiEntropyAdvertisement.class) 236 .register(AntiEntropyAdvertisement.class)
238 .register(HashMap.class) 237 .register(HashMap.class)
239 .build(); 238 .build();
...@@ -250,7 +249,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -250,7 +249,7 @@ public class EventuallyConsistentMapImpl<K, V>
250 */ 249 */
251 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) { 250 public EventuallyConsistentMapImpl<K, V> withBroadcastMessageExecutor(ExecutorService executor) {
252 checkNotNull(executor, "Null executor"); 251 checkNotNull(executor, "Null executor");
253 - broadcastMessageExecutor = executor; 252 + communicationExecutor = executor;
254 return this; 253 return this;
255 } 254 }
256 255
...@@ -303,7 +302,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -303,7 +302,7 @@ public class EventuallyConsistentMapImpl<K, V>
303 Timestamp timestamp = clockService.getTimestamp(key, value); 302 Timestamp timestamp = clockService.getTimestamp(key, value);
304 303
305 if (putInternal(key, value, timestamp)) { 304 if (putInternal(key, value, timestamp)) {
306 - notifyPeers(new InternalPutEvent<>(key, value, timestamp), 305 + notifyPeers(new PutEntry<>(key, value, timestamp),
307 peerUpdateFunction.apply(key, value)); 306 peerUpdateFunction.apply(key, value));
308 notifyListeners(new EventuallyConsistentMapEvent<>( 307 notifyListeners(new EventuallyConsistentMapEvent<>(
309 EventuallyConsistentMapEvent.Type.PUT, key, value)); 308 EventuallyConsistentMapEvent.Type.PUT, key, value));
...@@ -350,7 +349,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -350,7 +349,7 @@ public class EventuallyConsistentMapImpl<K, V>
350 Timestamp timestamp = clockService.getTimestamp(key, null); 349 Timestamp timestamp = clockService.getTimestamp(key, null);
351 350
352 if (removeInternal(key, timestamp)) { 351 if (removeInternal(key, timestamp)) {
353 - notifyPeers(new InternalRemoveEvent<>(key, timestamp), 352 + notifyPeers(new RemoveEntry<>(key, timestamp),
354 peerUpdateFunction.apply(key, null)); 353 peerUpdateFunction.apply(key, null));
355 notifyListeners(new EventuallyConsistentMapEvent<>( 354 notifyListeners(new EventuallyConsistentMapEvent<>(
356 EventuallyConsistentMapEvent.Type.REMOVE, key, null)); 355 EventuallyConsistentMapEvent.Type.REMOVE, key, null));
...@@ -395,7 +394,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -395,7 +394,7 @@ public class EventuallyConsistentMapImpl<K, V>
395 Timestamp timestamp = clockService.getTimestamp(key, value); 394 Timestamp timestamp = clockService.getTimestamp(key, value);
396 395
397 if (removeInternal(key, timestamp)) { 396 if (removeInternal(key, timestamp)) {
398 - notifyPeers(new InternalRemoveEvent<>(key, timestamp), 397 + notifyPeers(new RemoveEntry<>(key, timestamp),
399 peerUpdateFunction.apply(key, value)); 398 peerUpdateFunction.apply(key, value));
400 notifyListeners(new EventuallyConsistentMapEvent<>( 399 notifyListeners(new EventuallyConsistentMapEvent<>(
401 EventuallyConsistentMapEvent.Type.REMOVE, key, value)); 400 EventuallyConsistentMapEvent.Type.REMOVE, key, value));
...@@ -405,75 +404,24 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -405,75 +404,24 @@ public class EventuallyConsistentMapImpl<K, V>
405 @Override 404 @Override
406 public void putAll(Map<? extends K, ? extends V> m) { 405 public void putAll(Map<? extends K, ? extends V> m) {
407 checkState(!destroyed, destroyedMessage); 406 checkState(!destroyed, destroyedMessage);
408 - 407 + m.forEach(this::put);
409 - List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
410 -
411 - for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
412 - K key = entry.getKey();
413 - V value = entry.getValue();
414 -
415 - checkNotNull(key, ERROR_NULL_KEY);
416 - checkNotNull(value, ERROR_NULL_VALUE);
417 -
418 - Timestamp timestamp = clockService.getTimestamp(key, value);
419 -
420 - if (putInternal(key, value, timestamp)) {
421 - updates.add(new PutEntry<>(key, value, timestamp));
422 - }
423 - }
424 -
425 - if (!updates.isEmpty()) {
426 - broadcastMessage(updateMessageSubject, new InternalPutEvent<>(updates));
427 -
428 - for (PutEntry<K, V> entry : updates) {
429 - EventuallyConsistentMapEvent<K, V> externalEvent =
430 - new EventuallyConsistentMapEvent<>(
431 - EventuallyConsistentMapEvent.Type.PUT, entry.key(),
432 - entry.value());
433 - notifyListeners(externalEvent);
434 - }
435 - }
436 } 408 }
437 409
438 @Override 410 @Override
439 public void clear() { 411 public void clear() {
440 checkState(!destroyed, destroyedMessage); 412 checkState(!destroyed, destroyedMessage);
441 - 413 + items.forEach((key, value) -> remove(key));
442 - List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
443 -
444 - for (K key : items.keySet()) {
445 - // TODO also this is not applicable if value is important for timestamp?
446 - Timestamp timestamp = clockService.getTimestamp(key, null);
447 -
448 - if (removeInternal(key, timestamp)) {
449 - removed.add(new RemoveEntry<>(key, timestamp));
450 - }
451 - }
452 -
453 - if (!removed.isEmpty()) {
454 - broadcastMessage(removeMessageSubject, new InternalRemoveEvent<>(removed));
455 -
456 - for (RemoveEntry<K> entry : removed) {
457 - EventuallyConsistentMapEvent<K, V> externalEvent
458 - = new EventuallyConsistentMapEvent<>(
459 - EventuallyConsistentMapEvent.Type.REMOVE, entry.key(),
460 - null);
461 - notifyListeners(externalEvent);
462 - }
463 - }
464 } 414 }
465 415
466 @Override 416 @Override
467 public Set<K> keySet() { 417 public Set<K> keySet() {
468 checkState(!destroyed, destroyedMessage); 418 checkState(!destroyed, destroyedMessage);
469 -
470 return items.keySet(); 419 return items.keySet();
471 } 420 }
472 421
473 @Override 422 @Override
474 public Collection<V> values() { 423 public Collection<V> values() {
475 checkState(!destroyed, destroyedMessage); 424 checkState(!destroyed, destroyedMessage);
476 -
477 return items.values().stream() 425 return items.values().stream()
478 .map(Timestamped::value) 426 .map(Timestamped::value)
479 .collect(Collectors.toList()); 427 .collect(Collectors.toList());
...@@ -508,12 +456,11 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -508,12 +456,11 @@ public class EventuallyConsistentMapImpl<K, V>
508 456
509 executor.shutdown(); 457 executor.shutdown();
510 backgroundExecutor.shutdown(); 458 backgroundExecutor.shutdown();
511 - broadcastMessageExecutor.shutdown(); 459 + communicationExecutor.shutdown();
512 460
513 listeners.clear(); 461 listeners.clear();
514 462
515 clusterCommunicator.removeSubscriber(updateMessageSubject); 463 clusterCommunicator.removeSubscriber(updateMessageSubject);
516 - clusterCommunicator.removeSubscriber(removeMessageSubject);
517 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject); 464 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
518 } 465 }
519 466
...@@ -523,45 +470,32 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -523,45 +470,32 @@ public class EventuallyConsistentMapImpl<K, V>
523 } 470 }
524 } 471 }
525 472
526 - private void notifyPeers(InternalPutEvent event, Iterable<NodeId> peers) { 473 + private void notifyPeers(PutEntry<K, V> event, Collection<NodeId> peers) {
527 - // FIXME extremely memory expensive when we are overrun 474 + queueUpdate(event, peers);
528 -// broadcastMessageExecutor.execute(() -> broadcastMessage(updateMessageSubject, event));
529 - multicastMessage(updateMessageSubject, event, peers);
530 } 475 }
531 476
532 - private void notifyPeers(InternalRemoveEvent event, Iterable<NodeId> peers) { 477 + private void notifyPeers(RemoveEntry<K, V> event, Collection<NodeId> peers) {
533 - // FIXME extremely memory expensive when we are overrun 478 + queueUpdate(event, peers);
534 -// broadcastMessageExecutor.execute(() -> broadcastMessage(removeMessageSubject, event));
535 - multicastMessage(removeMessageSubject, event, peers);
536 } 479 }
537 480
538 - private void multicastMessage(MessageSubject subject, Object event, Iterable<NodeId> peers) { 481 + private void queueUpdate(AbstractEntry<K, V> event, Collection<NodeId> peers) {
539 - // FIXME can we parallelize the serialization... use the caller??? 482 + if (peers == null) {
540 - ClusterMessage message = new ClusterMessage( 483 + // we have no friends :(
541 - clusterService.getLocalNode().id(), 484 + return;
542 - subject,
543 - serializer.encode(event));
544 - broadcastMessageExecutor.execute(() -> clusterCommunicator.multicast(message, peers));
545 -// clusterCommunicator.broadcast(message);
546 } 485 }
547 - 486 + peers.forEach(node ->
548 - private void broadcastMessage(MessageSubject subject, Object event) { 487 + senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
549 - // FIXME can we parallelize the serialization... use the caller??? 488 + );
550 - ClusterMessage message = new ClusterMessage(
551 - clusterService.getLocalNode().id(),
552 - subject,
553 - serializer.encode(event));
554 - broadcastMessageExecutor.execute(() -> clusterCommunicator.broadcast(message));
555 -// clusterCommunicator.broadcast(message);
556 } 489 }
557 490
558 - private void unicastMessage(NodeId peer, MessageSubject subject, Object event) { 491 + private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
559 ClusterMessage message = new ClusterMessage( 492 ClusterMessage message = new ClusterMessage(
560 clusterService.getLocalNode().id(), 493 clusterService.getLocalNode().id(),
561 subject, 494 subject,
562 serializer.encode(event)); 495 serializer.encode(event));
563 -// clusterCommunicator.unicast(message, peer); 496 + return clusterCommunicator.unicast(message, peer);
564 - broadcastMessageExecutor.execute(() -> clusterCommunicator.unicast(message, peer)); 497 + // Note: we had this flipped before...
498 +// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
565 } 499 }
566 500
567 private boolean underHighLoad() { 501 private boolean underHighLoad() {
...@@ -606,9 +540,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -606,9 +540,9 @@ public class EventuallyConsistentMapImpl<K, V>
606 540
607 AntiEntropyAdvertisement<K> ad = createAdvertisement(); 541 AntiEntropyAdvertisement<K> ad = createAdvertisement();
608 542
609 - // TODO check the return value? 543 + if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
610 - unicastMessage(peer, antiEntropyAdvertisementSubject, ad); 544 + log.debug("Failed to send anti-entropy advertisement to {}", peer);
611 - // error log: log.debug("Failed to send anti-entropy advertisement to {}", peer); 545 + }
612 } catch (Exception e) { 546 } catch (Exception e) {
613 // Catch all exceptions to avoid scheduled task being suppressed. 547 // Catch all exceptions to avoid scheduled task being suppressed.
614 log.error("Exception thrown while sending advertisement", e); 548 log.error("Exception thrown while sending advertisement", e);
...@@ -644,9 +578,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -644,9 +578,9 @@ public class EventuallyConsistentMapImpl<K, V>
644 // Send the advertisement back if this peer is out-of-sync 578 // Send the advertisement back if this peer is out-of-sync
645 final NodeId sender = ad.sender(); 579 final NodeId sender = ad.sender();
646 AntiEntropyAdvertisement<K> myAd = createAdvertisement(); 580 AntiEntropyAdvertisement<K> myAd = createAdvertisement();
647 - // TODO check the return value? 581 + if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
648 - unicastMessage(sender, antiEntropyAdvertisementSubject, myAd); 582 + log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
649 - // error log: log.debug("Failed to send reactive anti-entropy advertisement to {}", sender); 583 + }
650 break; 584 break;
651 } 585 }
652 } 586 }
...@@ -670,8 +604,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -670,8 +604,6 @@ public class EventuallyConsistentMapImpl<K, V>
670 = new LinkedList<>(); 604 = new LinkedList<>();
671 final NodeId sender = ad.sender(); 605 final NodeId sender = ad.sender();
672 606
673 - final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
674 -
675 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) { 607 for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
676 K key = item.getKey(); 608 K key = item.getKey();
677 Timestamped<V> localValue = item.getValue(); 609 Timestamped<V> localValue = item.getValue();
...@@ -683,9 +615,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -683,9 +615,8 @@ public class EventuallyConsistentMapImpl<K, V>
683 if (remoteTimestamp == null || localValue 615 if (remoteTimestamp == null || localValue
684 .isNewerThan(remoteTimestamp)) { 616 .isNewerThan(remoteTimestamp)) {
685 // local value is more recent, push to sender 617 // local value is more recent, push to sender
686 - updatesToSend 618 + queueUpdate(new PutEntry<>(key, localValue.value(),
687 - .add(new PutEntry<>(key, localValue.value(), 619 + localValue.timestamp()), ImmutableList.of(sender));
688 - localValue.timestamp()));
689 } 620 }
690 621
691 Timestamp remoteDeadTimestamp = ad.tombstones().get(key); 622 Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
...@@ -699,14 +630,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -699,14 +630,6 @@ public class EventuallyConsistentMapImpl<K, V>
699 } 630 }
700 } 631 }
701 632
702 - // Send all updates to the peer at once
703 - if (!updatesToSend.isEmpty()) {
704 - // TODO check the return value?
705 - unicastMessage(sender, updateMessageSubject,
706 - new InternalPutEvent<>(updatesToSend));
707 - //error log: log.warn("Failed to send advertisement response", e);
708 - }
709 -
710 return externalEvents; 633 return externalEvents;
711 } 634 }
712 635
...@@ -720,8 +643,6 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -720,8 +643,6 @@ public class EventuallyConsistentMapImpl<K, V>
720 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) { 643 private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
721 final NodeId sender = ad.sender(); 644 final NodeId sender = ad.sender();
722 645
723 - final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
724 -
725 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) { 646 for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
726 K key = dead.getKey(); 647 K key = dead.getKey();
727 Timestamp localDeadTimestamp = dead.getValue(); 648 Timestamp localDeadTimestamp = dead.getValue();
...@@ -730,17 +651,8 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -730,17 +651,8 @@ public class EventuallyConsistentMapImpl<K, V>
730 if (remoteLiveTimestamp != null 651 if (remoteLiveTimestamp != null
731 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) { 652 && localDeadTimestamp.isNewerThan(remoteLiveTimestamp)) {
732 // sender has zombie, push remove 653 // sender has zombie, push remove
733 - removesToSend 654 + queueUpdate(new RemoveEntry<>(key, localDeadTimestamp), ImmutableList.of(sender));
734 - .add(new RemoveEntry<>(key, localDeadTimestamp));
735 - }
736 } 655 }
737 -
738 - // Send all removes to the peer at once
739 - if (!removesToSend.isEmpty()) {
740 - // TODO check the return value
741 - unicastMessage(sender, removeMessageSubject,
742 - new InternalRemoveEvent<>(removesToSend));
743 - // error log: log.warn("Failed to send advertisement response", e);
744 } 656 }
745 } 657 }
746 658
...@@ -800,25 +712,44 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -800,25 +712,44 @@ public class EventuallyConsistentMapImpl<K, V>
800 } 712 }
801 } 713 }
802 714
803 - private final class InternalPutEventListener implements 715 + private final class InternalEventListener implements
804 ClusterMessageHandler { 716 ClusterMessageHandler {
805 @Override 717 @Override
806 public void handle(ClusterMessage message) { 718 public void handle(ClusterMessage message) {
807 - log.debug("Received put event from peer: {}", message.sender()); 719 + log.debug("Received update event from peer: {}", message.sender());
808 - InternalPutEvent<K, V> event = serializer.decode(message.payload()); 720 + Collection<AbstractEntry<K, V>> events = serializer.decode(message.payload());
809 721
810 try { 722 try {
811 - for (PutEntry<K, V> entry : event.entries()) { 723 + // TODO clean this for loop up
812 - K key = entry.key(); 724 + for (AbstractEntry<K, V> entry : events) {
813 - V value = entry.value(); 725 + final K key = entry.key();
814 - Timestamp timestamp = entry.timestamp(); 726 + final V value;
727 + final Timestamp timestamp = entry.timestamp();
728 + final EventuallyConsistentMapEvent.Type type;
729 + if (entry instanceof PutEntry) {
730 + PutEntry<K, V> putEntry = (PutEntry<K, V>) entry;
731 + value = putEntry.value();
732 + type = EventuallyConsistentMapEvent.Type.PUT;
733 + } else if (entry instanceof RemoveEntry) {
734 + type = EventuallyConsistentMapEvent.Type.REMOVE;
735 + value = null;
736 + } else {
737 + throw new IllegalStateException("Unknown entry type " + entry.getClass());
738 + }
815 739
816 - if (putInternal(key, value, timestamp)) { 740 + boolean success;
817 - EventuallyConsistentMapEvent<K, V> externalEvent = 741 + switch (type) {
818 - new EventuallyConsistentMapEvent<>( 742 + case PUT:
819 - EventuallyConsistentMapEvent.Type.PUT, key, 743 + success = putInternal(key, value, timestamp);
820 - value); 744 + break;
821 - notifyListeners(externalEvent); 745 + case REMOVE:
746 + success = removeInternal(key, timestamp);
747 + break;
748 + default:
749 + success = false;
750 + }
751 + if (success) {
752 + notifyListeners(new EventuallyConsistentMapEvent<>(type, key, value));
822 } 753 }
823 } 754 }
824 } catch (Exception e) { 755 } catch (Exception e) {
...@@ -827,29 +758,35 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -827,29 +758,35 @@ public class EventuallyConsistentMapImpl<K, V>
827 } 758 }
828 } 759 }
829 760
830 - private final class InternalRemoveEventListener implements 761 + // TODO pull this into the class if this gets pulled out...
831 - ClusterMessageHandler { 762 + private static final int DEFAULT_MAX_EVENTS = 1000;
832 - @Override 763 + private static final int DEFAULT_MAX_IDLE_MS = 10;
833 - public void handle(ClusterMessage message) { 764 + private static final int DEFAULT_MAX_BATCH_MS = 50;
834 - log.debug("Received remove event from peer: {}", message.sender()); 765 + private static final Timer TIMER = new Timer("onos-ecm-sender-events");
835 - InternalRemoveEvent<K> event = serializer.decode(message.payload());
836 - try {
837 - for (RemoveEntry<K> entry : event.entries()) {
838 - K key = entry.key();
839 - Timestamp timestamp = entry.timestamp();
840 766
841 - if (removeInternal(key, timestamp)) { 767 + private final class EventAccumulator extends AbstractAccumulator<AbstractEntry<K, V>> {
842 - EventuallyConsistentMapEvent<K, V> externalEvent 768 +
843 - = new EventuallyConsistentMapEvent<>( 769 + private final NodeId peer;
844 - EventuallyConsistentMapEvent.Type.REMOVE, 770 +
845 - key, null); 771 + private EventAccumulator(NodeId peer) {
846 - notifyListeners(externalEvent); 772 + super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
847 - } 773 + this.peer = peer;
848 } 774 }
775 +
776 + @Override
777 + public void processItems(List<AbstractEntry<K, V>> items) {
778 + Map<K, AbstractEntry<K, V>> map = Maps.newHashMap();
779 + items.forEach(item -> map.compute(item.key(), (key, oldValue) ->
780 + oldValue == null || item.compareTo(oldValue) > 0 ? item : oldValue
781 + )
782 + );
783 + communicationExecutor.submit(() -> {
784 + try {
785 + unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
849 } catch (Exception e) { 786 } catch (Exception e) {
850 - log.warn("Exception thrown handling remove", e); 787 + log.warn("broadcast error", e);
851 } 788 }
789 + });
852 } 790 }
853 } 791 }
854 -
855 } 792 }
......
1 -/*
2 - * Copyright 2015 Open Networking Laboratory
3 - *
4 - * Licensed under the Apache License, Version 2.0 (the "License");
5 - * you may not use this file except in compliance with the License.
6 - * You may obtain a copy of the License at
7 - *
8 - * http://www.apache.org/licenses/LICENSE-2.0
9 - *
10 - * Unless required by applicable law or agreed to in writing, software
11 - * distributed under the License is distributed on an "AS IS" BASIS,
12 - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 - * See the License for the specific language governing permissions and
14 - * limitations under the License.
15 - */
16 -package org.onosproject.store.ecmap;
17 -
18 -import com.google.common.collect.ImmutableList;
19 -import org.onosproject.store.Timestamp;
20 -
21 -import java.util.List;
22 -
23 -import static com.google.common.base.Preconditions.checkNotNull;
24 -
25 -/**
26 - * Internal inter-instance event used by EventuallyConsistentMap for REMOVE
27 - * events.
28 - */
29 -final class InternalRemoveEvent<K> {
30 - private final List<RemoveEntry<K>> entries;
31 -
32 - /**
33 - * Creates a remove event for a single key.
34 - *
35 - * @param key key the event concerns
36 - * @param timestamp timestamp of the event
37 - */
38 - public InternalRemoveEvent(K key, Timestamp timestamp) {
39 - entries = ImmutableList.of(new RemoveEntry<>(key, timestamp));
40 - }
41 -
42 - /**
43 - * Creates a remove event for multiple keys.
44 - *
45 - * @param entries list of remove entries to send an event for
46 - */
47 - public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
48 - this.entries = checkNotNull(entries);
49 - }
50 -
51 - // Needed for serialization.
52 - @SuppressWarnings("unused")
53 - private InternalRemoveEvent() {
54 - entries = null;
55 - }
56 -
57 - /**
58 - * Returns the list of remove entries this event concerns.
59 - *
60 - * @return list of remove entries
61 - */
62 - public List<RemoveEntry<K>> entries() {
63 - return entries;
64 - }
65 -}
...@@ -23,10 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -23,10 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
23 /** 23 /**
24 * Describes a single put event in an EventuallyConsistentMap. 24 * Describes a single put event in an EventuallyConsistentMap.
25 */ 25 */
26 -final class PutEntry<K, V> { 26 +final class PutEntry<K, V> extends AbstractEntry<K, V> {
27 - private final K key;
28 private final V value; 27 private final V value;
29 - private final Timestamp timestamp;
30 28
31 /** 29 /**
32 * Creates a new put entry. 30 * Creates a new put entry.
...@@ -36,26 +34,15 @@ final class PutEntry<K, V> { ...@@ -36,26 +34,15 @@ final class PutEntry<K, V> {
36 * @param timestamp timestamp of the put event 34 * @param timestamp timestamp of the put event
37 */ 35 */
38 public PutEntry(K key, V value, Timestamp timestamp) { 36 public PutEntry(K key, V value, Timestamp timestamp) {
39 - this.key = checkNotNull(key); 37 + super(key, timestamp);
40 this.value = checkNotNull(value); 38 this.value = checkNotNull(value);
41 - this.timestamp = checkNotNull(timestamp);
42 } 39 }
43 40
44 // Needed for serialization. 41 // Needed for serialization.
45 @SuppressWarnings("unused") 42 @SuppressWarnings("unused")
46 private PutEntry() { 43 private PutEntry() {
47 - this.key = null; 44 + super();
48 this.value = null; 45 this.value = null;
49 - this.timestamp = null;
50 - }
51 -
52 - /**
53 - * Returns the key of the entry.
54 - *
55 - * @return the key
56 - */
57 - public K key() {
58 - return key;
59 } 46 }
60 47
61 /** 48 /**
...@@ -67,21 +54,12 @@ final class PutEntry<K, V> { ...@@ -67,21 +54,12 @@ final class PutEntry<K, V> {
67 return value; 54 return value;
68 } 55 }
69 56
70 - /**
71 - * Returns the timestamp of the event.
72 - *
73 - * @return the timestamp
74 - */
75 - public Timestamp timestamp() {
76 - return timestamp;
77 - }
78 -
79 @Override 57 @Override
80 public String toString() { 58 public String toString() {
81 return MoreObjects.toStringHelper(getClass()) 59 return MoreObjects.toStringHelper(getClass())
82 - .add("key", key) 60 + .add("key", key())
83 .add("value", value) 61 .add("value", value)
84 - .add("timestamp", timestamp) 62 + .add("timestamp", timestamp())
85 .toString(); 63 .toString();
86 } 64 }
87 } 65 }
......
...@@ -18,15 +18,10 @@ package org.onosproject.store.ecmap; ...@@ -18,15 +18,10 @@ package org.onosproject.store.ecmap;
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 import org.onosproject.store.Timestamp; 19 import org.onosproject.store.Timestamp;
20 20
21 -import static com.google.common.base.Preconditions.checkNotNull;
22 -
23 /** 21 /**
24 * Describes a single remove event in an EventuallyConsistentMap. 22 * Describes a single remove event in an EventuallyConsistentMap.
25 */ 23 */
26 -final class RemoveEntry<K> { 24 +final class RemoveEntry<K, V> extends AbstractEntry<K, V> {
27 - private final K key;
28 - private final Timestamp timestamp;
29 -
30 /** 25 /**
31 * Creates a new remove entry. 26 * Creates a new remove entry.
32 * 27 *
...@@ -34,40 +29,20 @@ final class RemoveEntry<K> { ...@@ -34,40 +29,20 @@ final class RemoveEntry<K> {
34 * @param timestamp timestamp of the remove event 29 * @param timestamp timestamp of the remove event
35 */ 30 */
36 public RemoveEntry(K key, Timestamp timestamp) { 31 public RemoveEntry(K key, Timestamp timestamp) {
37 - this.key = checkNotNull(key); 32 + super(key, timestamp);
38 - this.timestamp = checkNotNull(timestamp);
39 } 33 }
40 34
41 // Needed for serialization. 35 // Needed for serialization.
42 @SuppressWarnings("unused") 36 @SuppressWarnings("unused")
43 private RemoveEntry() { 37 private RemoveEntry() {
44 - this.key = null; 38 + super();
45 - this.timestamp = null;
46 - }
47 -
48 - /**
49 - * Returns the key of the entry.
50 - *
51 - * @return the key
52 - */
53 - public K key() {
54 - return key;
55 - }
56 -
57 - /**
58 - * Returns the timestamp of the event.
59 - *
60 - * @return the timestamp
61 - */
62 - public Timestamp timestamp() {
63 - return timestamp;
64 } 39 }
65 40
66 @Override 41 @Override
67 public String toString() { 42 public String toString() {
68 return MoreObjects.toStringHelper(getClass()) 43 return MoreObjects.toStringHelper(getClass())
69 - .add("key", key) 44 + .add("key", key())
70 - .add("timestamp", timestamp) 45 + .add("timestamp", timestamp())
71 .toString(); 46 .toString();
72 } 47 }
73 } 48 }
......
...@@ -17,6 +17,7 @@ package org.onosproject.store.ecmap; ...@@ -17,6 +17,7 @@ package org.onosproject.store.ecmap;
17 17
18 import com.google.common.collect.ComparisonChain; 18 import com.google.common.collect.ComparisonChain;
19 import com.google.common.collect.ImmutableSet; 19 import com.google.common.collect.ImmutableSet;
20 +import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.MoreExecutors; 22 import com.google.common.util.concurrent.MoreExecutors;
22 import org.junit.After; 23 import org.junit.After;
...@@ -67,10 +68,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -67,10 +68,8 @@ public class EventuallyConsistentMapImplTest {
67 private SequentialClockService<String, String> clockService; 68 private SequentialClockService<String, String> clockService;
68 69
69 private static final String MAP_NAME = "test"; 70 private static final String MAP_NAME = "test";
70 - private static final MessageSubject PUT_MESSAGE_SUBJECT 71 + private static final MessageSubject UPDATE_MESSAGE_SUBJECT
71 = new MessageSubject("ecm-" + MAP_NAME + "-update"); 72 = new MessageSubject("ecm-" + MAP_NAME + "-update");
72 - private static final MessageSubject REMOVE_MESSAGE_SUBJECT
73 - = new MessageSubject("ecm-" + MAP_NAME + "-remove");
74 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT 73 private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
75 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy"); 74 = new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
76 75
...@@ -82,8 +81,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -82,8 +81,7 @@ public class EventuallyConsistentMapImplTest {
82 private final ControllerNode self = 81 private final ControllerNode self =
83 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1)); 82 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
84 83
85 - private ClusterMessageHandler putHandler; 84 + private ClusterMessageHandler updateHandler;
86 - private ClusterMessageHandler removeHandler;
87 private ClusterMessageHandler antiEntropyHandler; 85 private ClusterMessageHandler antiEntropyHandler;
88 86
89 /* 87 /*
...@@ -105,8 +103,6 @@ public class EventuallyConsistentMapImplTest { ...@@ -105,8 +103,6 @@ public class EventuallyConsistentMapImplTest {
105 .register(PutEntry.class) 103 .register(PutEntry.class)
106 .register(RemoveEntry.class) 104 .register(RemoveEntry.class)
107 .register(ArrayList.class) 105 .register(ArrayList.class)
108 - .register(InternalPutEvent.class)
109 - .register(InternalRemoveEvent.class)
110 .register(AntiEntropyAdvertisement.class) 106 .register(AntiEntropyAdvertisement.class)
111 .register(HashMap.class) 107 .register(HashMap.class)
112 .build(); 108 .build();
...@@ -237,7 +233,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -237,7 +233,7 @@ public class EventuallyConsistentMapImplTest {
237 ecMap.addListener(new TestListener(latch)); 233 ecMap.addListener(new TestListener(latch));
238 234
239 assertNull(ecMap.get(KEY2)); 235 assertNull(ecMap.get(KEY2));
240 - putHandler.handle(message); 236 + updateHandler.handle(message);
241 assertTrue("External listener never got notified of internal event", 237 assertTrue("External listener never got notified of internal event",
242 latch.await(100, TimeUnit.MILLISECONDS)); 238 latch.await(100, TimeUnit.MILLISECONDS));
243 assertEquals(VALUE2, ecMap.get(KEY2)); 239 assertEquals(VALUE2, ecMap.get(KEY2));
...@@ -254,7 +250,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -254,7 +250,7 @@ public class EventuallyConsistentMapImplTest {
254 latch = new CountDownLatch(1); 250 latch = new CountDownLatch(1);
255 ecMap.addListener(new TestListener(latch)); 251 ecMap.addListener(new TestListener(latch));
256 252
257 - removeHandler.handle(removeMessage); 253 + updateHandler.handle(removeMessage);
258 assertTrue("External listener never got notified of internal event", 254 assertTrue("External listener never got notified of internal event",
259 latch.await(100, TimeUnit.MILLISECONDS)); 255 latch.await(100, TimeUnit.MILLISECONDS));
260 assertNull(ecMap.get(KEY1)); 256 assertNull(ecMap.get(KEY1));
...@@ -568,8 +564,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -568,8 +564,7 @@ public class EventuallyConsistentMapImplTest {
568 564
569 @Test 565 @Test
570 public void testDestroy() throws Exception { 566 public void testDestroy() throws Exception {
571 - clusterCommunicator.removeSubscriber(PUT_MESSAGE_SUBJECT); 567 + clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
572 - clusterCommunicator.removeSubscriber(REMOVE_MESSAGE_SUBJECT);
573 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT); 568 clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
574 569
575 replay(clusterCommunicator); 570 replay(clusterCommunicator);
...@@ -594,12 +589,11 @@ public class EventuallyConsistentMapImplTest { ...@@ -594,12 +589,11 @@ public class EventuallyConsistentMapImplTest {
594 } 589 }
595 590
596 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) { 591 private ClusterMessage generatePutMessage(String key, String value, Timestamp timestamp) {
597 - InternalPutEvent<String, String> event = 592 + PutEntry<String, String> event = new PutEntry<>(key, value, timestamp);
598 - new InternalPutEvent<>(key, value, timestamp);
599 593
600 return new ClusterMessage( 594 return new ClusterMessage(
601 - clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT, 595 + clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
602 - SERIALIZER.encode(event)); 596 + SERIALIZER.encode(Lists.newArrayList(event)));
603 } 597 }
604 598
605 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) { 599 private ClusterMessage generatePutMessage(String key1, String value1, String key2, String value2) {
...@@ -614,38 +608,35 @@ public class EventuallyConsistentMapImplTest { ...@@ -614,38 +608,35 @@ public class EventuallyConsistentMapImplTest {
614 list.add(pe1); 608 list.add(pe1);
615 list.add(pe2); 609 list.add(pe2);
616 610
617 - InternalPutEvent<String, String> event = new InternalPutEvent<>(list);
618 611
619 return new ClusterMessage( 612 return new ClusterMessage(
620 - clusterService.getLocalNode().id(), PUT_MESSAGE_SUBJECT, 613 + clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
621 - SERIALIZER.encode(event)); 614 + SERIALIZER.encode(list));
622 } 615 }
623 616
624 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) { 617 private ClusterMessage generateRemoveMessage(String key, Timestamp timestamp) {
625 - InternalRemoveEvent<String> event = new InternalRemoveEvent<>(key, timestamp); 618 + RemoveEntry<String, String> event = new RemoveEntry<>(key, timestamp);
626 619
627 return new ClusterMessage( 620 return new ClusterMessage(
628 - clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT, 621 + clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
629 - SERIALIZER.encode(event)); 622 + SERIALIZER.encode(Lists.newArrayList(event)));
630 } 623 }
631 624
632 private ClusterMessage generateRemoveMessage(String key1, String key2) { 625 private ClusterMessage generateRemoveMessage(String key1, String key2) {
633 - ArrayList<RemoveEntry<String>> list = new ArrayList<>(); 626 + ArrayList<RemoveEntry<String, String>> list = new ArrayList<>();
634 627
635 Timestamp timestamp1 = clockService.peek(1); 628 Timestamp timestamp1 = clockService.peek(1);
636 Timestamp timestamp2 = clockService.peek(2); 629 Timestamp timestamp2 = clockService.peek(2);
637 630
638 - RemoveEntry<String> re1 = new RemoveEntry<>(key1, timestamp1); 631 + RemoveEntry<String, String> re1 = new RemoveEntry<>(key1, timestamp1);
639 - RemoveEntry<String> re2 = new RemoveEntry<>(key2, timestamp2); 632 + RemoveEntry<String, String> re2 = new RemoveEntry<>(key2, timestamp2);
640 633
641 list.add(re1); 634 list.add(re1);
642 list.add(re2); 635 list.add(re2);
643 636
644 - InternalRemoveEvent<String> event = new InternalRemoveEvent<>(list);
645 -
646 return new ClusterMessage( 637 return new ClusterMessage(
647 - clusterService.getLocalNode().id(), REMOVE_MESSAGE_SUBJECT, 638 + clusterService.getLocalNode().id(), UPDATE_MESSAGE_SUBJECT,
648 - SERIALIZER.encode(event)); 639 + SERIALIZER.encode(list));
649 } 640 }
650 641
651 /** 642 /**
...@@ -655,10 +646,14 @@ public class EventuallyConsistentMapImplTest { ...@@ -655,10 +646,14 @@ public class EventuallyConsistentMapImplTest {
655 * @param m message we expect to be sent 646 * @param m message we expect to be sent
656 * @param clusterCommunicator a mock ClusterCommunicationService to set up 647 * @param clusterCommunicator a mock ClusterCommunicationService to set up
657 */ 648 */
649 + //FIXME rename
658 private static void expectSpecificBroadcastMessage(ClusterMessage m, 650 private static void expectSpecificBroadcastMessage(ClusterMessage m,
659 ClusterCommunicationService clusterCommunicator) { 651 ClusterCommunicationService clusterCommunicator) {
660 reset(clusterCommunicator); 652 reset(clusterCommunicator);
661 - expect(clusterCommunicator.broadcast(m)).andReturn(true); 653 +// expect(clusterCommunicator.broadcast(m)).andReturn(true);
654 + expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
655 + .andReturn(true)
656 + .anyTimes();
662 replay(clusterCommunicator); 657 replay(clusterCommunicator);
663 } 658 }
664 659
...@@ -669,10 +664,14 @@ public class EventuallyConsistentMapImplTest { ...@@ -669,10 +664,14 @@ public class EventuallyConsistentMapImplTest {
669 * @param m message we expect to be sent 664 * @param m message we expect to be sent
670 * @param clusterCommunicator a mock ClusterCommunicationService to set up 665 * @param clusterCommunicator a mock ClusterCommunicationService to set up
671 */ 666 */
667 + //FIXME rename
672 private static void expectSpecificMulticastMessage(ClusterMessage m, 668 private static void expectSpecificMulticastMessage(ClusterMessage m,
673 ClusterCommunicationService clusterCommunicator) { 669 ClusterCommunicationService clusterCommunicator) {
674 reset(clusterCommunicator); 670 reset(clusterCommunicator);
675 - expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true); 671 +// expect(clusterCommunicator.multicast(eq(m), anyObject(Set.class))).andReturn(true);
672 + expect(clusterCommunicator.unicast(eq(m), anyObject(NodeId.class)))
673 + .andReturn(true)
674 + .anyTimes();
676 replay(clusterCommunicator); 675 replay(clusterCommunicator);
677 } 676 }
678 677
...@@ -684,10 +683,13 @@ public class EventuallyConsistentMapImplTest { ...@@ -684,10 +683,13 @@ public class EventuallyConsistentMapImplTest {
684 * 683 *
685 * @param clusterCommunicator a mock ClusterCommunicationService to set up 684 * @param clusterCommunicator a mock ClusterCommunicationService to set up
686 */ 685 */
686 + //FIXME rename
687 private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) { 687 private void expectPeerMessage(ClusterCommunicationService clusterCommunicator) {
688 reset(clusterCommunicator); 688 reset(clusterCommunicator);
689 - expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class), 689 +// expect(clusterCommunicator.multicast(anyObject(ClusterMessage.class),
690 - anyObject(Iterable.class))) 690 +// anyObject(Iterable.class)))
691 + expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class),
692 + anyObject(NodeId.class)))
691 .andReturn(true) 693 .andReturn(true)
692 .anyTimes(); 694 .anyTimes();
693 replay(clusterCommunicator); 695 replay(clusterCommunicator);
...@@ -700,9 +702,13 @@ public class EventuallyConsistentMapImplTest { ...@@ -700,9 +702,13 @@ public class EventuallyConsistentMapImplTest {
700 * 702 *
701 * @param clusterCommunicator a mock ClusterCommunicationService to set up 703 * @param clusterCommunicator a mock ClusterCommunicationService to set up
702 */ 704 */
705 + //FIXME rename
703 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) { 706 private void expectBroadcastMessage(ClusterCommunicationService clusterCommunicator) {
704 reset(clusterCommunicator); 707 reset(clusterCommunicator);
705 - expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class))) 708 +// expect(clusterCommunicator.broadcast(anyObject(ClusterMessage.class)))
709 +// .andReturn(true)
710 +// .anyTimes();
711 + expect(clusterCommunicator.unicast(anyObject(ClusterMessage.class), anyObject(NodeId.class)))
706 .andReturn(true) 712 .andReturn(true)
707 .anyTimes(); 713 .anyTimes();
708 replay(clusterCommunicator); 714 replay(clusterCommunicator);
...@@ -747,10 +753,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -747,10 +753,8 @@ public class EventuallyConsistentMapImplTest {
747 @Override 753 @Override
748 public void addSubscriber(MessageSubject subject, 754 public void addSubscriber(MessageSubject subject,
749 ClusterMessageHandler subscriber) { 755 ClusterMessageHandler subscriber) {
750 - if (subject.equals(PUT_MESSAGE_SUBJECT)) { 756 + if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
751 - putHandler = subscriber; 757 + updateHandler = subscriber;
752 - } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
753 - removeHandler = subscriber;
754 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) { 758 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
755 antiEntropyHandler = subscriber; 759 antiEntropyHandler = subscriber;
756 } else { 760 } else {
...@@ -762,10 +766,8 @@ public class EventuallyConsistentMapImplTest { ...@@ -762,10 +766,8 @@ public class EventuallyConsistentMapImplTest {
762 public void addSubscriber(MessageSubject subject, 766 public void addSubscriber(MessageSubject subject,
763 ClusterMessageHandler subscriber, 767 ClusterMessageHandler subscriber,
764 ExecutorService executor) { 768 ExecutorService executor) {
765 - if (subject.equals(PUT_MESSAGE_SUBJECT)) { 769 + if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
766 - putHandler = subscriber; 770 + updateHandler = subscriber;
767 - } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
768 - removeHandler = subscriber;
769 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) { 771 } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
770 antiEntropyHandler = subscriber; 772 antiEntropyHandler = subscriber;
771 } else { 773 } else {
......