Jon Hall
Committed by Gerrit Code Review

Improvements to ECMapImpl to increase consistency

- When a new map is created, initiate advertisements with peers
- Increase High Load threshold to an average of 1 op per slot
- Attempts to mitigate [ONOS-4569]

Change-Id: I0412d17b55804e4bc095347256e94a49a344c0cc
...@@ -120,7 +120,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -120,7 +120,7 @@ public class EventuallyConsistentMapImpl<K, V>
120 private final boolean tombstonesDisabled; 120 private final boolean tombstonesDisabled;
121 121
122 private static final int WINDOW_SIZE = 5; 122 private static final int WINDOW_SIZE = 5;
123 - private static final int HIGH_LOAD_THRESHOLD = 0; 123 + private static final int HIGH_LOAD_THRESHOLD = 2;
124 private static final int LOAD_WINDOW = 2; 124 private static final int LOAD_WINDOW = 2;
125 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE); 125 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
126 126
...@@ -262,6 +262,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -262,6 +262,9 @@ public class EventuallyConsistentMapImpl<K, V>
262 262
263 this.tombstonesDisabled = tombstonesDisabled; 263 this.tombstonesDisabled = tombstonesDisabled;
264 this.lightweightAntiEntropy = !convergeFaster; 264 this.lightweightAntiEntropy = !convergeFaster;
265 +
266 + // Initiate first round of Gossip
267 + this.bootstrap();
265 } 268 }
266 269
267 private StoreSerializer createSerializer(KryoNamespace ns) { 270 private StoreSerializer createSerializer(KryoNamespace ns) {
...@@ -721,6 +724,35 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -721,6 +724,35 @@ public class EventuallyConsistentMapImpl<K, V>
721 }); 724 });
722 } 725 }
723 726
727 + private void bootstrap() {
728 + /*
729 + * Attempt to get in sync with the cluster when a map is created. This is to help avoid a new node
730 + * writing to an ECM until it has a view of the map. Depending on how lightweight the map instance
731 + * is, this will attempt to advertise to all or some of the peers.
732 + */
733 + int n = 0;
734 + List<NodeId> activePeers = clusterService.getNodes()
735 + .stream()
736 + .map(ControllerNode::id)
737 + .filter(id -> !localNodeId.equals(id))
738 + .filter(id -> clusterService.getState(id).isActive())
739 + .collect(Collectors.toList());
740 +
741 + if (activePeers.isEmpty()) {
742 + return;
743 + }
744 +
745 + if (lightweightAntiEntropy) {
746 + n = activePeers.size() / 2;
747 + } else {
748 + n = activePeers.size();
749 + }
750 +
751 + for (int i = 0; i < n; i++) {
752 + sendAdvertisementToPeer(activePeers.get(i));
753 + }
754 + }
755 +
724 // TODO pull this into the class if this gets pulled out... 756 // TODO pull this into the class if this gets pulled out...
725 private static final int DEFAULT_MAX_EVENTS = 1000; 757 private static final int DEFAULT_MAX_EVENTS = 1000;
726 private static final int DEFAULT_MAX_IDLE_MS = 10; 758 private static final int DEFAULT_MAX_IDLE_MS = 10;
......