Madan Jampani
Committed by Gerrit Code Review

Fixes some issues that were preventing intent tests from running

 - LeadershipStore to support serving getAllLeaderships from cache.
 - Removed a changed to KryoNamespaces that was causing serialization issues. Instead moved that type registration to McastStore.

Change-Id: I06acf1a397b6a982c0dfd0ebc0830b2161cf23a4
...@@ -71,6 +71,8 @@ public class DistributedLeadershipStore ...@@ -71,6 +71,8 @@ public class DistributedLeadershipStore
71 71
72 protected NodeId localNodeId; 72 protected NodeId localNodeId;
73 protected ConsistentMap<String, InternalLeadership> leadershipMap; 73 protected ConsistentMap<String, InternalLeadership> leadershipMap;
74 + protected Map<String, Versioned<InternalLeadership>> leadershipCache = Maps.newConcurrentMap();
75 +
74 private final MapEventListener<String, InternalLeadership> leadershipChangeListener = 76 private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
75 event -> { 77 event -> {
76 Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue())); 78 Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
...@@ -91,6 +93,12 @@ public class DistributedLeadershipStore ...@@ -91,6 +93,12 @@ public class DistributedLeadershipStore
91 if (!leaderChanged && candidatesChanged) { 93 if (!leaderChanged && candidatesChanged) {
92 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED; 94 eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
93 } 95 }
96 + leadershipCache.compute(event.key(), (k, v) -> {
97 + if (v == null || v.version() < event.newValue().version()) {
98 + return event.newValue();
99 + }
100 + return v;
101 + });
94 notifyDelegate(new LeadershipEvent(eventType, newValue)); 102 notifyDelegate(new LeadershipEvent(eventType, newValue));
95 }; 103 };
96 104
...@@ -103,6 +111,7 @@ public class DistributedLeadershipStore ...@@ -103,6 +111,7 @@ public class DistributedLeadershipStore
103 .withRelaxedReadConsistency() 111 .withRelaxedReadConsistency()
104 .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class)) 112 .withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
105 .build(); 113 .build();
114 + leadershipMap.entrySet().forEach(e -> leadershipCache.put(e.getKey(), e.getValue()));
106 leadershipMap.addListener(leadershipChangeListener); 115 leadershipMap.addListener(leadershipChangeListener);
107 log.info("Started"); 116 log.info("Started");
108 } 117 }
...@@ -210,16 +219,13 @@ public class DistributedLeadershipStore ...@@ -210,16 +219,13 @@ public class DistributedLeadershipStore
210 219
211 @Override 220 @Override
212 public Leadership getLeadership(String topic) { 221 public Leadership getLeadership(String topic) {
213 - return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic))); 222 + InternalLeadership internalLeadership = Versioned.valueOrNull(leadershipCache.get(topic));
223 + return internalLeadership == null ? null : internalLeadership.asLeadership();
214 } 224 }
215 225
216 @Override 226 @Override
217 public Map<String, Leadership> getLeaderships() { 227 public Map<String, Leadership> getLeaderships() {
218 - Map<String, Leadership> leaderships = Maps.newHashMap(); 228 + return ImmutableMap.copyOf(Maps.transformValues(leadershipCache, v -> v.value().asLeadership()));
219 - leadershipMap.entrySet().forEach(e -> {
220 - leaderships.put(e.getKey(), e.getValue().value().asLeadership());
221 - });
222 - return ImmutableMap.copyOf(leaderships);
223 } 229 }
224 230
225 private static class InternalLeadership { 231 private static class InternalLeadership {
......
...@@ -225,7 +225,6 @@ import java.util.concurrent.CopyOnWriteArraySet; ...@@ -225,7 +225,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
225 import java.util.concurrent.atomic.AtomicBoolean; 225 import java.util.concurrent.atomic.AtomicBoolean;
226 import java.util.concurrent.atomic.AtomicInteger; 226 import java.util.concurrent.atomic.AtomicInteger;
227 import java.util.concurrent.atomic.AtomicLong; 227 import java.util.concurrent.atomic.AtomicLong;
228 -import java.util.concurrent.atomic.AtomicReference;
229 228
230 public final class KryoNamespaces { 229 public final class KryoNamespaces {
231 230
...@@ -235,7 +234,6 @@ public final class KryoNamespaces { ...@@ -235,7 +234,6 @@ public final class KryoNamespaces {
235 .register(AtomicBoolean.class) 234 .register(AtomicBoolean.class)
236 .register(AtomicInteger.class) 235 .register(AtomicInteger.class)
237 .register(AtomicLong.class) 236 .register(AtomicLong.class)
238 - .register(AtomicReference.class)
239 .register(new ImmutableListSerializer(), 237 .register(new ImmutableListSerializer(),
240 ImmutableList.class, 238 ImmutableList.class,
241 ImmutableList.of(1).getClass(), 239 ImmutableList.of(1).getClass(),
......
...@@ -22,6 +22,7 @@ import org.slf4j.Logger; ...@@ -22,6 +22,7 @@ import org.slf4j.Logger;
22 22
23 import java.util.Map; 23 import java.util.Map;
24 import java.util.Set; 24 import java.util.Set;
25 +import java.util.concurrent.atomic.AtomicReference;
25 26
26 import static org.slf4j.LoggerFactory.getLogger; 27 import static org.slf4j.LoggerFactory.getLogger;
27 28
...@@ -56,6 +57,7 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD ...@@ -56,6 +57,7 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
56 .withSerializer(Serializer.using(KryoNamespace.newBuilder() 57 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
57 .register(KryoNamespaces.API) 58 .register(KryoNamespaces.API)
58 .register( 59 .register(
60 + AtomicReference.class,
59 MulticastData.class, 61 MulticastData.class,
60 McastRoute.class, 62 McastRoute.class,
61 McastRoute.Type.class 63 McastRoute.Type.class
......
...@@ -57,6 +57,7 @@ public final class MulticastData { ...@@ -57,6 +57,7 @@ public final class MulticastData {
57 } 57 }
58 58
59 public void setSource(ConnectPoint source) { 59 public void setSource(ConnectPoint source) {
60 + // FIXME: violates immutability
60 isEmpty.set(false); 61 isEmpty.set(false);
61 this.source.set(source); 62 this.source.set(source);
62 } 63 }
......