Madan Jampani
Committed by Gerrit Code Review

Fix StoragePartition to return a furture for opening partition client + Fixes i…

…n AtomixLeaderElector

Change-Id: I6adf91e84cc17aec8acc895884dc8fbe75037978
...@@ -50,6 +50,7 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag ...@@ -50,6 +50,7 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
50 private final MessagingService messagingService; 50 private final MessagingService messagingService;
51 private final ClusterService clusterService; 51 private final ClusterService clusterService;
52 private final File logFolder; 52 private final File logFolder;
53 + private CompletableFuture<StoragePartitionServer> serverOpenFuture;
53 private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of( 54 private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
54 new ResourceType(DistributedLong.class), 55 new ResourceType(DistributedLong.class),
55 new ResourceType(AtomixLeaderElector.class), 56 new ResourceType(AtomixLeaderElector.class),
...@@ -90,9 +91,9 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag ...@@ -90,9 +91,9 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
90 91
91 @Override 92 @Override
92 public CompletableFuture<Void> open() { 93 public CompletableFuture<Void> open() {
93 - return openServer().thenAccept(s -> server = Optional.ofNullable(s)) 94 + serverOpenFuture = openServer();
94 - .thenCompose(v-> openClient()) 95 + serverOpenFuture.thenAccept(s -> server = Optional.ofNullable(s));
95 - .thenAccept(v -> isOpened.set(true)) 96 + return openClient().thenAccept(v -> isOpened.set(true))
96 .thenApply(v -> null); 97 .thenApply(v -> null);
97 } 98 }
98 99
......
...@@ -44,7 +44,7 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector> ...@@ -44,7 +44,7 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
44 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners = 44 private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
45 Sets.newConcurrentHashSet(); 45 Sets.newConcurrentHashSet();
46 46
47 - public static final String CHANGE_SUBJECT = "changeEvents"; 47 + public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
48 private Listener<Change<Leadership>> listener; 48 private Listener<Change<Leadership>> listener;
49 49
50 public AtomixLeaderElector(CopycatClient client, Resource.Options options) { 50 public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
......
...@@ -298,6 +298,11 @@ public final class AtomixLeaderElectorCommands { ...@@ -298,6 +298,11 @@ public final class AtomixLeaderElectorCommands {
298 private String topic; 298 private String topic;
299 private NodeId nodeId; 299 private NodeId nodeId;
300 300
301 + ElectionChangeCommand() {
302 + topic = null;
303 + nodeId = null;
304 + }
305 +
301 public ElectionChangeCommand(String topic, NodeId nodeId) { 306 public ElectionChangeCommand(String topic, NodeId nodeId) {
302 this.topic = topic; 307 this.topic = topic;
303 this.nodeId = nodeId; 308 this.nodeId = nodeId;
...@@ -347,6 +352,10 @@ public final class AtomixLeaderElectorCommands { ...@@ -347,6 +352,10 @@ public final class AtomixLeaderElectorCommands {
347 */ 352 */
348 @SuppressWarnings("serial") 353 @SuppressWarnings("serial")
349 public static class Anoint extends ElectionChangeCommand<Boolean> { 354 public static class Anoint extends ElectionChangeCommand<Boolean> {
355 +
356 + private Anoint() {
357 + }
358 +
350 public Anoint(String topic, NodeId nodeId) { 359 public Anoint(String topic, NodeId nodeId) {
351 super(topic, nodeId); 360 super(topic, nodeId);
352 } 361 }
...@@ -357,6 +366,10 @@ public final class AtomixLeaderElectorCommands { ...@@ -357,6 +366,10 @@ public final class AtomixLeaderElectorCommands {
357 */ 366 */
358 @SuppressWarnings("serial") 367 @SuppressWarnings("serial")
359 public static class Promote extends ElectionChangeCommand<Boolean> { 368 public static class Promote extends ElectionChangeCommand<Boolean> {
369 +
370 + private Promote() {
371 + }
372 +
360 public Promote(String topic, NodeId nodeId) { 373 public Promote(String topic, NodeId nodeId) {
361 super(topic, nodeId); 374 super(topic, nodeId);
362 } 375 }
......
...@@ -202,9 +202,10 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -202,9 +202,10 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
202 public boolean anoint(Commit<? extends Anoint> commit) { 202 public boolean anoint(Commit<? extends Anoint> commit) {
203 try { 203 try {
204 String topic = commit.operation().topic(); 204 String topic = commit.operation().topic();
205 + NodeId nodeId = commit.operation().nodeId();
205 Leadership oldLeadership = leadership(topic); 206 Leadership oldLeadership = leadership(topic);
206 ElectionState electionState = elections.computeIfPresent(topic, 207 ElectionState electionState = elections.computeIfPresent(topic,
207 - (k, v) -> new ElectionState(v).transferLeadership(commit.operation().nodeId(), termCounter(topic))); 208 + (k, v) -> v.transferLeadership(nodeId, termCounter(topic)));
208 Leadership newLeadership = leadership(topic); 209 Leadership newLeadership = leadership(topic);
209 if (!Objects.equal(oldLeadership, newLeadership)) { 210 if (!Objects.equal(oldLeadership, newLeadership)) {
210 notifyLeadershipChange(oldLeadership, newLeadership); 211 notifyLeadershipChange(oldLeadership, newLeadership);
...@@ -230,7 +231,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -230,7 +231,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
230 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) { 231 if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
231 return false; 232 return false;
232 } 233 }
233 - elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId())); 234 + elections.computeIfPresent(topic, (k, v) -> v.promote(nodeId));
234 Leadership newLeadership = leadership(topic); 235 Leadership newLeadership = leadership(topic);
235 if (!Objects.equal(oldLeadership, newLeadership)) { 236 if (!Objects.equal(oldLeadership, newLeadership)) {
236 notifyLeadershipChange(oldLeadership, newLeadership); 237 notifyLeadershipChange(oldLeadership, newLeadership);
...@@ -498,7 +499,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine ...@@ -498,7 +499,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
498 .filter(r -> r.nodeId().equals(nodeId)) 499 .filter(r -> r.nodeId().equals(nodeId))
499 .findFirst() 500 .findFirst()
500 .orElse(null); 501 .orElse(null);
501 - List<Registration> updatedRegistrations = Lists.newLinkedList(); 502 + List<Registration> updatedRegistrations = Lists.newArrayList();
502 updatedRegistrations.add(registration); 503 updatedRegistrations.add(registration);
503 registrations.stream() 504 registrations.stream()
504 .filter(r -> !r.nodeId().equals(nodeId)) 505 .filter(r -> !r.nodeId().equals(nodeId))
......