Madan Jampani
Committed by Gerrit Code Review

Re-register listeners after a Copycat client recovers from a network partition

Change-Id: I1b2669011e1f229f8b6edc836eb89c39ea371a97
(cherry picked from commit fb786381)
...@@ -56,7 +56,6 @@ import org.onosproject.store.service.MapEvent; ...@@ -56,7 +56,6 @@ import org.onosproject.store.service.MapEvent;
56 import org.onosproject.store.service.MapEventListener; 56 import org.onosproject.store.service.MapEventListener;
57 import org.onosproject.store.service.MapTransaction; 57 import org.onosproject.store.service.MapTransaction;
58 import org.onosproject.store.service.Versioned; 58 import org.onosproject.store.service.Versioned;
59 -
60 import com.google.common.collect.ImmutableSet; 59 import com.google.common.collect.ImmutableSet;
61 import com.google.common.collect.Maps; 60 import com.google.common.collect.Maps;
62 import com.google.common.collect.Sets; 61 import com.google.common.collect.Sets;
...@@ -85,6 +84,11 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -85,6 +84,11 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
85 @Override 84 @Override
86 public CompletableFuture<AtomixConsistentMap> open() { 85 public CompletableFuture<AtomixConsistentMap> open() {
87 return super.open().thenApply(result -> { 86 return super.open().thenApply(result -> {
87 + client.onStateChange(state -> {
88 + if (state == CopycatClient.State.CONNECTED && isListening()) {
89 + client.submit(new Listen());
90 + }
91 + });
88 client.onEvent(CHANGE_SUBJECT, this::handleEvent); 92 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
89 return result; 93 return result;
90 }); 94 });
...@@ -308,4 +312,8 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -308,4 +312,8 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
308 public Collection<Consumer<Status>> statusChangeListeners() { 312 public Collection<Consumer<Status>> statusChangeListeners() {
309 return ImmutableSet.copyOf(statusChangeListeners); 313 return ImmutableSet.copyOf(statusChangeListeners);
310 } 314 }
315 +
316 + private boolean isListening() {
317 + return !mapEventListeners.isEmpty();
318 + }
311 } 319 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -94,6 +94,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -94,6 +94,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
94 @Override 94 @Override
95 public CompletableFuture<AtomixLeaderElector> open() { 95 public CompletableFuture<AtomixLeaderElector> open() {
96 return super.open().thenApply(result -> { 96 return super.open().thenApply(result -> {
97 + client.onStateChange(state -> {
98 + if (state == CopycatClient.State.CONNECTED && isListening()) {
99 + client.submit(new Listen());
100 + }
101 + });
97 client.onEvent(CHANGE_SUBJECT, this::handleEvent); 102 client.onEvent(CHANGE_SUBJECT, this::handleEvent);
98 return result; 103 return result;
99 }); 104 });
...@@ -183,4 +188,8 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -183,4 +188,8 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
183 public Collection<Consumer<Status>> statusChangeListeners() { 188 public Collection<Consumer<Status>> statusChangeListeners() {
184 return ImmutableSet.copyOf(statusChangeListeners); 189 return ImmutableSet.copyOf(statusChangeListeners);
185 } 190 }
191 +
192 + private boolean isListening() {
193 + return !leadershipChangeListeners.isEmpty();
194 + }
186 } 195 }
......