Madan Jampani

Re-register listeners after a Copycat client recovers from a network partition

Change-Id: I1b2669011e1f229f8b6edc836eb89c39ea371a97
......@@ -56,7 +56,6 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -85,6 +84,11 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
public CompletableFuture<AtomixConsistentMap> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
if (state == CopycatClient.State.CONNECTED && isListening()) {
client.submit(new Listen());
}
});
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
......@@ -308,4 +312,8 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners);
}
private boolean isListening() {
return !mapEventListeners.isEmpty();
}
}
\ No newline at end of file
......
......@@ -94,6 +94,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
if (state == CopycatClient.State.CONNECTED && isListening()) {
client.submit(new Listen());
}
});
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
......@@ -183,4 +188,8 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners);
}
private boolean isListening() {
return !leadershipChangeListeners.isEmpty();
}
}
......