Madan Jampani
Committed by Gerrit Code Review

Code clean up in ConsistentMap and LeaderElector resources

Change-Id: I1834188393f19e37394c32047538e6027522a13d
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.store.primitives.resources.impl; 16 package org.onosproject.store.primitives.resources.impl;
17 17
18 -import io.atomix.catalyst.util.Listener;
19 import io.atomix.copycat.client.CopycatClient; 18 import io.atomix.copycat.client.CopycatClient;
20 import io.atomix.resource.Resource; 19 import io.atomix.resource.Resource;
21 import io.atomix.resource.ResourceTypeInfo; 20 import io.atomix.resource.ResourceTypeInfo;
...@@ -32,6 +31,21 @@ import java.util.function.Predicate; ...@@ -32,6 +31,21 @@ import java.util.function.Predicate;
32 31
33 import org.onlab.util.Match; 32 import org.onlab.util.Match;
34 import org.onosproject.store.primitives.TransactionId; 33 import org.onosproject.store.primitives.TransactionId;
34 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
35 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
36 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsValue;
37 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.EntrySet;
38 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Get;
39 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.IsEmpty;
40 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
41 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
42 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
43 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
44 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
45 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionRollback;
46 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Unlisten;
47 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
48 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
35 import org.onosproject.store.service.AsyncConsistentMap; 49 import org.onosproject.store.service.AsyncConsistentMap;
36 import org.onosproject.store.service.MapEvent; 50 import org.onosproject.store.service.MapEvent;
37 import org.onosproject.store.service.MapEventListener; 51 import org.onosproject.store.service.MapEventListener;
...@@ -76,48 +90,48 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -76,48 +90,48 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
76 90
77 @Override 91 @Override
78 public CompletableFuture<Boolean> isEmpty() { 92 public CompletableFuture<Boolean> isEmpty() {
79 - return submit(new AtomixConsistentMapCommands.IsEmpty()); 93 + return submit(new IsEmpty());
80 } 94 }
81 95
82 @Override 96 @Override
83 public CompletableFuture<Integer> size() { 97 public CompletableFuture<Integer> size() {
84 - return submit(new AtomixConsistentMapCommands.Size()); 98 + return submit(new Size());
85 } 99 }
86 100
87 @Override 101 @Override
88 public CompletableFuture<Boolean> containsKey(String key) { 102 public CompletableFuture<Boolean> containsKey(String key) {
89 - return submit(new AtomixConsistentMapCommands.ContainsKey(key)); 103 + return submit(new ContainsKey(key));
90 } 104 }
91 105
92 @Override 106 @Override
93 public CompletableFuture<Boolean> containsValue(byte[] value) { 107 public CompletableFuture<Boolean> containsValue(byte[] value) {
94 - return submit(new AtomixConsistentMapCommands.ContainsValue(value)); 108 + return submit(new ContainsValue(value));
95 } 109 }
96 110
97 @Override 111 @Override
98 public CompletableFuture<Versioned<byte[]>> get(String key) { 112 public CompletableFuture<Versioned<byte[]>> get(String key) {
99 - return submit(new AtomixConsistentMapCommands.Get(key)); 113 + return submit(new Get(key));
100 } 114 }
101 115
102 @Override 116 @Override
103 public CompletableFuture<Set<String>> keySet() { 117 public CompletableFuture<Set<String>> keySet() {
104 - return submit(new AtomixConsistentMapCommands.KeySet()); 118 + return submit(new KeySet());
105 } 119 }
106 120
107 @Override 121 @Override
108 public CompletableFuture<Collection<Versioned<byte[]>>> values() { 122 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
109 - return submit(new AtomixConsistentMapCommands.Values()); 123 + return submit(new Values());
110 } 124 }
111 125
112 @Override 126 @Override
113 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() { 127 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
114 - return submit(new AtomixConsistentMapCommands.EntrySet()); 128 + return submit(new EntrySet());
115 } 129 }
116 130
117 @Override 131 @Override
118 @SuppressWarnings("unchecked") 132 @SuppressWarnings("unchecked")
119 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) { 133 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
120 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY)) 134 + return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
121 .whenComplete((r, e) -> throwIfLocked(r.status())) 135 .whenComplete((r, e) -> throwIfLocked(r.status()))
122 .thenApply(v -> v.oldValue()); 136 .thenApply(v -> v.oldValue());
123 } 137 }
...@@ -125,7 +139,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -125,7 +139,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
125 @Override 139 @Override
126 @SuppressWarnings("unchecked") 140 @SuppressWarnings("unchecked")
127 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) { 141 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
128 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.ANY, Match.ANY)) 142 + return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
129 .whenComplete((r, e) -> throwIfLocked(r.status())) 143 .whenComplete((r, e) -> throwIfLocked(r.status()))
130 .thenApply(v -> v.newValue()); 144 .thenApply(v -> v.newValue());
131 } 145 }
...@@ -133,14 +147,14 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -133,14 +147,14 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
133 @Override 147 @Override
134 @SuppressWarnings("unchecked") 148 @SuppressWarnings("unchecked")
135 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) { 149 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
136 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NULL, Match.ANY)) 150 + return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
137 .whenComplete((r, e) -> throwIfLocked(r.status())) 151 .whenComplete((r, e) -> throwIfLocked(r.status()))
138 .thenApply(v -> v.oldValue()); 152 .thenApply(v -> v.oldValue());
139 } 153 }
140 @Override 154 @Override
141 @SuppressWarnings("unchecked") 155 @SuppressWarnings("unchecked")
142 public CompletableFuture<Versioned<byte[]>> remove(String key) { 156 public CompletableFuture<Versioned<byte[]>> remove(String key) {
143 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ANY)) 157 + return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
144 .whenComplete((r, e) -> throwIfLocked(r.status())) 158 .whenComplete((r, e) -> throwIfLocked(r.status()))
145 .thenApply(v -> v.oldValue()); 159 .thenApply(v -> v.oldValue());
146 } 160 }
...@@ -148,7 +162,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -148,7 +162,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
148 @Override 162 @Override
149 @SuppressWarnings("unchecked") 163 @SuppressWarnings("unchecked")
150 public CompletableFuture<Boolean> remove(String key, byte[] value) { 164 public CompletableFuture<Boolean> remove(String key, byte[] value) {
151 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ifValue(value), Match.ANY)) 165 + return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
152 .whenComplete((r, e) -> throwIfLocked(r.status())) 166 .whenComplete((r, e) -> throwIfLocked(r.status()))
153 .thenApply(v -> v.updated()); 167 .thenApply(v -> v.updated());
154 } 168 }
...@@ -156,7 +170,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -156,7 +170,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
156 @Override 170 @Override
157 @SuppressWarnings("unchecked") 171 @SuppressWarnings("unchecked")
158 public CompletableFuture<Boolean> remove(String key, long version) { 172 public CompletableFuture<Boolean> remove(String key, long version) {
159 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, null, Match.ANY, Match.ifValue(version))) 173 + return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
160 .whenComplete((r, e) -> throwIfLocked(r.status())) 174 .whenComplete((r, e) -> throwIfLocked(r.status()))
161 .thenApply(v -> v.updated()); 175 .thenApply(v -> v.updated());
162 } 176 }
...@@ -164,7 +178,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -164,7 +178,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
164 @Override 178 @Override
165 @SuppressWarnings("unchecked") 179 @SuppressWarnings("unchecked")
166 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) { 180 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
167 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY)) 181 + return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
168 .whenComplete((r, e) -> throwIfLocked(r.status())) 182 .whenComplete((r, e) -> throwIfLocked(r.status()))
169 .thenApply(v -> v.oldValue()); 183 .thenApply(v -> v.oldValue());
170 } 184 }
...@@ -172,7 +186,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -172,7 +186,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
172 @Override 186 @Override
173 @SuppressWarnings("unchecked") 187 @SuppressWarnings("unchecked")
174 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) { 188 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
175 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, 189 + return submit(new UpdateAndGet(key,
176 newValue, 190 newValue,
177 Match.ifValue(oldValue), 191 Match.ifValue(oldValue),
178 Match.ANY)) 192 Match.ANY))
...@@ -183,7 +197,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -183,7 +197,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
183 @Override 197 @Override
184 @SuppressWarnings("unchecked") 198 @SuppressWarnings("unchecked")
185 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) { 199 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
186 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, 200 + return submit(new UpdateAndGet(key,
187 newValue, 201 newValue,
188 Match.ANY, 202 Match.ANY,
189 Match.ifValue(oldVersion))) 203 Match.ifValue(oldVersion)))
...@@ -193,7 +207,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -193,7 +207,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
193 207
194 @Override 208 @Override
195 public CompletableFuture<Void> clear() { 209 public CompletableFuture<Void> clear() {
196 - return submit(new AtomixConsistentMapCommands.Clear()) 210 + return submit(new Clear())
197 .whenComplete((r, e) -> throwIfLocked(r)) 211 .whenComplete((r, e) -> throwIfLocked(r))
198 .thenApply(v -> null); 212 .thenApply(v -> null);
199 } 213 }
...@@ -224,7 +238,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -224,7 +238,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
224 } 238 }
225 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY; 239 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
226 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version()); 240 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
227 - return submit(new AtomixConsistentMapCommands.UpdateAndGet(key, 241 + return submit(new UpdateAndGet(key,
228 computedValue.get(), 242 computedValue.get(),
229 valueMatch, 243 valueMatch,
230 versionMatch)) 244 versionMatch))
...@@ -235,18 +249,18 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -235,18 +249,18 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
235 249
236 @Override 250 @Override
237 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) { 251 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
238 - if (!mapEventListeners.isEmpty()) { 252 + if (mapEventListeners.isEmpty()) {
253 + return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
254 + } else {
239 mapEventListeners.add(listener); 255 mapEventListeners.add(listener);
240 return CompletableFuture.completedFuture(null); 256 return CompletableFuture.completedFuture(null);
241 } 257 }
242 - mapEventListeners.add(listener);
243 - return submit(new AtomixConsistentMapCommands.Listen()).thenApply(v -> null);
244 } 258 }
245 259
246 @Override 260 @Override
247 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) { 261 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
248 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) { 262 if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
249 - return submit(new AtomixConsistentMapCommands.Unlisten()).thenApply(v -> null); 263 + return submit(new Unlisten()).thenApply(v -> null);
250 } 264 }
251 return CompletableFuture.completedFuture(null); 265 return CompletableFuture.completedFuture(null);
252 } 266 }
...@@ -259,45 +273,17 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap> ...@@ -259,45 +273,17 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
259 273
260 @Override 274 @Override
261 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) { 275 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
262 - return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction)) 276 + return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
263 - .thenApply(v -> v == PrepareResult.OK);
264 } 277 }
265 278
266 @Override 279 @Override
267 public CompletableFuture<Void> commit(TransactionId transactionId) { 280 public CompletableFuture<Void> commit(TransactionId transactionId) {
268 - return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId)) 281 + return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
269 - .thenApply(v -> null);
270 } 282 }
271 283
272 @Override 284 @Override
273 public CompletableFuture<Void> rollback(TransactionId transactionId) { 285 public CompletableFuture<Void> rollback(TransactionId transactionId) {
274 - return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId)) 286 + return submit(new TransactionRollback(transactionId))
275 .thenApply(v -> null); 287 .thenApply(v -> null);
276 } 288 }
277 -
278 - /**
279 - * Change listener context.
280 - */
281 - private final class ChangeListener implements Listener<MapEvent<String, byte[]>> {
282 - private final MapEventListener<String, byte[]> listener;
283 -
284 - private ChangeListener(MapEventListener<String, byte[]> listener) {
285 - this.listener = listener;
286 - }
287 -
288 - @Override
289 - public void accept(MapEvent<String, byte[]> event) {
290 - listener.event(event);
291 - }
292 -
293 - @Override
294 - public void close() {
295 - synchronized (AtomixConsistentMap.this) {
296 - mapEventListeners.remove(listener);
297 - if (mapEventListeners.isEmpty()) {
298 - submit(new AtomixConsistentMapCommands.Unlisten());
299 - }
300 - }
301 - }
302 - }
303 } 289 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -29,6 +29,15 @@ import java.util.function.Consumer; ...@@ -29,6 +29,15 @@ import java.util.function.Consumer;
29 import org.onosproject.cluster.Leadership; 29 import org.onosproject.cluster.Leadership;
30 import org.onosproject.cluster.NodeId; 30 import org.onosproject.cluster.NodeId;
31 import org.onosproject.event.Change; 31 import org.onosproject.event.Change;
32 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
33 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
34 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
35 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
36 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
37 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
38 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
39 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
40 +import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
32 import org.onosproject.store.service.AsyncLeaderElector; 41 import org.onosproject.store.service.AsyncLeaderElector;
33 42
34 import com.google.common.collect.Sets; 43 import com.google.common.collect.Sets;
...@@ -70,22 +79,22 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector> ...@@ -70,22 +79,22 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
70 79
71 @Override 80 @Override
72 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) { 81 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
73 - return submit(new AtomixLeaderElectorCommands.Run(topic, nodeId)); 82 + return submit(new Run(topic, nodeId));
74 } 83 }
75 84
76 @Override 85 @Override
77 public CompletableFuture<Void> withdraw(String topic) { 86 public CompletableFuture<Void> withdraw(String topic) {
78 - return submit(new AtomixLeaderElectorCommands.Withdraw(topic)); 87 + return submit(new Withdraw(topic));
79 } 88 }
80 89
81 @Override 90 @Override
82 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) { 91 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
83 - return submit(new AtomixLeaderElectorCommands.Anoint(topic, nodeId)); 92 + return submit(new Anoint(topic, nodeId));
84 } 93 }
85 94
86 @Override 95 @Override
87 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) { 96 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
88 - return submit(new AtomixLeaderElectorCommands.Promote(topic, nodeId)); 97 + return submit(new Promote(topic, nodeId));
89 } 98 }
90 99
91 @Override 100 @Override
...@@ -95,69 +104,32 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector> ...@@ -95,69 +104,32 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
95 104
96 @Override 105 @Override
97 public CompletableFuture<Leadership> getLeadership(String topic) { 106 public CompletableFuture<Leadership> getLeadership(String topic) {
98 - return submit(new AtomixLeaderElectorCommands.GetLeadership(topic)); 107 + return submit(new GetLeadership(topic));
99 } 108 }
100 109
101 @Override 110 @Override
102 public CompletableFuture<Map<String, Leadership>> getLeaderships() { 111 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
103 - return submit(new AtomixLeaderElectorCommands.GetAllLeaderships()); 112 + return submit(new GetAllLeaderships());
104 } 113 }
105 114
106 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) { 115 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
107 - return submit(new AtomixLeaderElectorCommands.GetElectedTopics(nodeId)); 116 + return submit(new GetElectedTopics(nodeId));
108 - }
109 -
110 - /**
111 - * Leadership change listener context.
112 - */
113 - private final class LeadershipChangeListener implements Listener<Change<Leadership>> {
114 - private final Consumer<Change<Leadership>> listener;
115 -
116 - private LeadershipChangeListener(Consumer<Change<Leadership>> listener) {
117 - this.listener = listener;
118 - }
119 -
120 - @Override
121 - public void accept(Change<Leadership> change) {
122 - listener.accept(change);
123 - }
124 -
125 - @Override
126 - public void close() {
127 - synchronized (AtomixLeaderElector.this) {
128 - submit(new AtomixLeaderElectorCommands.Unlisten());
129 - }
130 - }
131 } 117 }
132 118
133 @Override 119 @Override
134 - public CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) { 120 + public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
121 + if (leadershipChangeListeners.isEmpty()) {
122 + return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
123 + } else {
135 leadershipChangeListeners.add(consumer); 124 leadershipChangeListeners.add(consumer);
136 - return setupListener();
137 - }
138 -
139 - @Override
140 - public CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
141 - leadershipChangeListeners.remove(consumer);
142 - return teardownListener();
143 - }
144 -
145 - private CompletableFuture<Void> setupListener() {
146 - if (listener == null && !leadershipChangeListeners.isEmpty()) {
147 - Consumer<Change<Leadership>> changeConsumer = change -> {
148 - leadershipChangeListeners.forEach(consumer -> consumer.accept(change));
149 - };
150 - return submit(new AtomixLeaderElectorCommands.Listen())
151 - .thenAccept(v -> listener = new LeadershipChangeListener(changeConsumer));
152 - }
153 return CompletableFuture.completedFuture(null); 125 return CompletableFuture.completedFuture(null);
154 } 126 }
127 + }
155 128
156 - private CompletableFuture<Void> teardownListener() { 129 + @Override
157 - if (listener != null && leadershipChangeListeners.isEmpty()) { 130 + public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
158 - listener.close(); 131 + if (leadershipChangeListeners.remove(listener) && leadershipChangeListeners.isEmpty()) {
159 - listener = null; 132 + return submit(new Unlisten()).thenApply(v -> null);
160 - return submit(new AtomixLeaderElectorCommands.Unlisten());
161 } 133 }
162 return CompletableFuture.completedFuture(null); 134 return CompletableFuture.completedFuture(null);
163 } 135 }
......