Madan Jampani
Committed by Gerrit Code Review

When a Copycat client is in SUSPENDED or CLOSED state fail-fast all its operations

Change-Id: I821ca0a488e68d004b4e41b6d8ac28368f09ffcb
(cherry picked from commit d5b200f5)
...@@ -33,6 +33,12 @@ public class StorageException extends RuntimeException { ...@@ -33,6 +33,12 @@ public class StorageException extends RuntimeException {
33 } 33 }
34 34
35 /** 35 /**
36 + * Store is temporarily unavailable.
37 + */
38 + public static class Unavailable extends StorageException {
39 + }
40 +
41 + /**
36 * Store operation timeout. 42 * Store operation timeout.
37 */ 43 */
38 public static class Timeout extends StorageException { 44 public static class Timeout extends StorageException {
......
...@@ -51,6 +51,7 @@ import org.onosproject.store.service.DistributedPrimitive.Status; ...@@ -51,6 +51,7 @@ import org.onosproject.store.service.DistributedPrimitive.Status;
51 import org.onosproject.store.service.DistributedQueue; 51 import org.onosproject.store.service.DistributedQueue;
52 import org.onosproject.store.service.PartitionClientInfo; 52 import org.onosproject.store.service.PartitionClientInfo;
53 import org.onosproject.store.service.Serializer; 53 import org.onosproject.store.service.Serializer;
54 +import org.onosproject.store.service.StorageException;
54 import org.slf4j.Logger; 55 import org.slf4j.Logger;
55 56
56 import com.google.common.base.Supplier; 57 import com.google.common.base.Supplier;
...@@ -119,6 +120,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -119,6 +120,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
119 120
120 @Override 121 @Override
121 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) { 122 public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
123 + checkAvailability();
122 AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join(); 124 AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
123 Consumer<State> statusListener = state -> { 125 Consumer<State> statusListener = state -> {
124 atomixConsistentMap.statusChangeListeners() 126 atomixConsistentMap.statusChangeListeners()
...@@ -143,11 +145,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -143,11 +145,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
143 145
144 @Override 146 @Override
145 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) { 147 public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
148 + checkAvailability();
146 return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer)); 149 return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
147 } 150 }
148 151
149 @Override 152 @Override
150 public AsyncAtomicCounter newAsyncCounter(String name) { 153 public AsyncAtomicCounter newAsyncCounter(String name) {
154 + checkAvailability();
151 DistributedLong distributedLong = client.getLong(name).join(); 155 DistributedLong distributedLong = client.getLong(name).join();
152 return new AtomixCounter(name, distributedLong); 156 return new AtomixCounter(name, distributedLong);
153 } 157 }
...@@ -165,6 +169,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -165,6 +169,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
165 169
166 @Override 170 @Override
167 public AsyncLeaderElector newAsyncLeaderElector(String name) { 171 public AsyncLeaderElector newAsyncLeaderElector(String name) {
172 + checkAvailability();
168 AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class) 173 AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
169 .thenCompose(AtomixLeaderElector::setupCache) 174 .thenCompose(AtomixLeaderElector::setupCache)
170 .join(); 175 .join();
...@@ -178,11 +183,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -178,11 +183,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
178 183
179 @Override 184 @Override
180 public Set<String> getAsyncConsistentMapNames() { 185 public Set<String> getAsyncConsistentMapNames() {
186 + checkAvailability();
181 return client.keys(AtomixConsistentMap.class).join(); 187 return client.keys(AtomixConsistentMap.class).join();
182 } 188 }
183 189
184 @Override 190 @Override
185 public Set<String> getAsyncAtomicCounterNames() { 191 public Set<String> getAsyncAtomicCounterNames() {
192 + checkAvailability();
186 return client.keys(DistributedLong.class).join(); 193 return client.keys(DistributedLong.class).join();
187 } 194 }
188 195
...@@ -227,4 +234,10 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana ...@@ -227,4 +234,10 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
227 } 234 }
228 return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100)); 235 return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100));
229 } 236 }
237 +
238 + private void checkAvailability() {
239 + if (resourceClient.client().state() == State.SUSPENDED || resourceClient.client().state() == State.CLOSED) {
240 + throw new StorageException.Unavailable();
241 + }
242 + }
230 } 243 }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.primitives.resources.impl; 16 package org.onosproject.store.primitives.resources.impl;
17 17
18 +import io.atomix.copycat.Operation;
18 import io.atomix.copycat.client.CopycatClient; 19 import io.atomix.copycat.client.CopycatClient;
19 import io.atomix.resource.AbstractResource; 20 import io.atomix.resource.AbstractResource;
20 import io.atomix.resource.ResourceTypeInfo; 21 import io.atomix.resource.ResourceTypeInfo;
...@@ -34,6 +35,7 @@ import java.util.function.Consumer; ...@@ -34,6 +35,7 @@ import java.util.function.Consumer;
34 import java.util.function.Predicate; 35 import java.util.function.Predicate;
35 36
36 import org.onlab.util.Match; 37 import org.onlab.util.Match;
38 +import org.onlab.util.Tools;
37 import org.onosproject.store.primitives.TransactionId; 39 import org.onosproject.store.primitives.TransactionId;
38 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear; 40 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
39 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey; 41 import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
...@@ -55,6 +57,7 @@ import org.onosproject.store.service.AsyncConsistentMap; ...@@ -55,6 +57,7 @@ import org.onosproject.store.service.AsyncConsistentMap;
55 import org.onosproject.store.service.MapEvent; 57 import org.onosproject.store.service.MapEvent;
56 import org.onosproject.store.service.MapEventListener; 58 import org.onosproject.store.service.MapEventListener;
57 import org.onosproject.store.service.MapTransaction; 59 import org.onosproject.store.service.MapTransaction;
60 +import org.onosproject.store.service.StorageException;
58 import org.onosproject.store.service.Versioned; 61 import org.onosproject.store.service.Versioned;
59 62
60 import com.google.common.collect.ImmutableSet; 63 import com.google.common.collect.ImmutableSet;
...@@ -97,48 +100,48 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -97,48 +100,48 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
97 100
98 @Override 101 @Override
99 public CompletableFuture<Boolean> isEmpty() { 102 public CompletableFuture<Boolean> isEmpty() {
100 - return client.submit(new IsEmpty()); 103 + return submit(new IsEmpty());
101 } 104 }
102 105
103 @Override 106 @Override
104 public CompletableFuture<Integer> size() { 107 public CompletableFuture<Integer> size() {
105 - return client.submit(new Size()); 108 + return submit(new Size());
106 } 109 }
107 110
108 @Override 111 @Override
109 public CompletableFuture<Boolean> containsKey(String key) { 112 public CompletableFuture<Boolean> containsKey(String key) {
110 - return client.submit(new ContainsKey(key)); 113 + return submit(new ContainsKey(key));
111 } 114 }
112 115
113 @Override 116 @Override
114 public CompletableFuture<Boolean> containsValue(byte[] value) { 117 public CompletableFuture<Boolean> containsValue(byte[] value) {
115 - return client.submit(new ContainsValue(value)); 118 + return submit(new ContainsValue(value));
116 } 119 }
117 120
118 @Override 121 @Override
119 public CompletableFuture<Versioned<byte[]>> get(String key) { 122 public CompletableFuture<Versioned<byte[]>> get(String key) {
120 - return client.submit(new Get(key)); 123 + return submit(new Get(key));
121 } 124 }
122 125
123 @Override 126 @Override
124 public CompletableFuture<Set<String>> keySet() { 127 public CompletableFuture<Set<String>> keySet() {
125 - return client.submit(new KeySet()); 128 + return submit(new KeySet());
126 } 129 }
127 130
128 @Override 131 @Override
129 public CompletableFuture<Collection<Versioned<byte[]>>> values() { 132 public CompletableFuture<Collection<Versioned<byte[]>>> values() {
130 - return client.submit(new Values()); 133 + return submit(new Values());
131 } 134 }
132 135
133 @Override 136 @Override
134 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() { 137 public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
135 - return client.submit(new EntrySet()); 138 + return submit(new EntrySet());
136 } 139 }
137 140
138 @Override 141 @Override
139 @SuppressWarnings("unchecked") 142 @SuppressWarnings("unchecked")
140 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) { 143 public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
141 - return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY)) 144 + return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
142 .whenComplete((r, e) -> throwIfLocked(r.status())) 145 .whenComplete((r, e) -> throwIfLocked(r.status()))
143 .thenApply(v -> v.oldValue()); 146 .thenApply(v -> v.oldValue());
144 } 147 }
...@@ -146,7 +149,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -146,7 +149,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
146 @Override 149 @Override
147 @SuppressWarnings("unchecked") 150 @SuppressWarnings("unchecked")
148 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) { 151 public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
149 - return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY)) 152 + return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
150 .whenComplete((r, e) -> throwIfLocked(r.status())) 153 .whenComplete((r, e) -> throwIfLocked(r.status()))
151 .thenApply(v -> v.newValue()); 154 .thenApply(v -> v.newValue());
152 } 155 }
...@@ -154,14 +157,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -154,14 +157,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
154 @Override 157 @Override
155 @SuppressWarnings("unchecked") 158 @SuppressWarnings("unchecked")
156 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) { 159 public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
157 - return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY)) 160 + return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
158 .whenComplete((r, e) -> throwIfLocked(r.status())) 161 .whenComplete((r, e) -> throwIfLocked(r.status()))
159 .thenApply(v -> v.oldValue()); 162 .thenApply(v -> v.oldValue());
160 } 163 }
161 @Override 164 @Override
162 @SuppressWarnings("unchecked") 165 @SuppressWarnings("unchecked")
163 public CompletableFuture<Versioned<byte[]>> remove(String key) { 166 public CompletableFuture<Versioned<byte[]>> remove(String key) {
164 - return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY)) 167 + return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
165 .whenComplete((r, e) -> throwIfLocked(r.status())) 168 .whenComplete((r, e) -> throwIfLocked(r.status()))
166 .thenApply(v -> v.oldValue()); 169 .thenApply(v -> v.oldValue());
167 } 170 }
...@@ -169,7 +172,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -169,7 +172,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
169 @Override 172 @Override
170 @SuppressWarnings("unchecked") 173 @SuppressWarnings("unchecked")
171 public CompletableFuture<Boolean> remove(String key, byte[] value) { 174 public CompletableFuture<Boolean> remove(String key, byte[] value) {
172 - return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY)) 175 + return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
173 .whenComplete((r, e) -> throwIfLocked(r.status())) 176 .whenComplete((r, e) -> throwIfLocked(r.status()))
174 .thenApply(v -> v.updated()); 177 .thenApply(v -> v.updated());
175 } 178 }
...@@ -177,7 +180,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -177,7 +180,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
177 @Override 180 @Override
178 @SuppressWarnings("unchecked") 181 @SuppressWarnings("unchecked")
179 public CompletableFuture<Boolean> remove(String key, long version) { 182 public CompletableFuture<Boolean> remove(String key, long version) {
180 - return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version))) 183 + return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
181 .whenComplete((r, e) -> throwIfLocked(r.status())) 184 .whenComplete((r, e) -> throwIfLocked(r.status()))
182 .thenApply(v -> v.updated()); 185 .thenApply(v -> v.updated());
183 } 186 }
...@@ -185,7 +188,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -185,7 +188,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
185 @Override 188 @Override
186 @SuppressWarnings("unchecked") 189 @SuppressWarnings("unchecked")
187 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) { 190 public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
188 - return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY)) 191 + return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
189 .whenComplete((r, e) -> throwIfLocked(r.status())) 192 .whenComplete((r, e) -> throwIfLocked(r.status()))
190 .thenApply(v -> v.oldValue()); 193 .thenApply(v -> v.oldValue());
191 } 194 }
...@@ -193,7 +196,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -193,7 +196,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
193 @Override 196 @Override
194 @SuppressWarnings("unchecked") 197 @SuppressWarnings("unchecked")
195 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) { 198 public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
196 - return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY)) 199 + return submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
197 .whenComplete((r, e) -> throwIfLocked(r.status())) 200 .whenComplete((r, e) -> throwIfLocked(r.status()))
198 .thenApply(v -> v.updated()); 201 .thenApply(v -> v.updated());
199 } 202 }
...@@ -201,14 +204,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -201,14 +204,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
201 @Override 204 @Override
202 @SuppressWarnings("unchecked") 205 @SuppressWarnings("unchecked")
203 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) { 206 public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
204 - return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion))) 207 + return submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
205 .whenComplete((r, e) -> throwIfLocked(r.status())) 208 .whenComplete((r, e) -> throwIfLocked(r.status()))
206 .thenApply(v -> v.updated()); 209 .thenApply(v -> v.updated());
207 } 210 }
208 211
209 @Override 212 @Override
210 public CompletableFuture<Void> clear() { 213 public CompletableFuture<Void> clear() {
211 - return client.submit(new Clear()) 214 + return submit(new Clear())
212 .whenComplete((r, e) -> throwIfLocked(r)) 215 .whenComplete((r, e) -> throwIfLocked(r))
213 .thenApply(v -> null); 216 .thenApply(v -> null);
214 } 217 }
...@@ -239,7 +242,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -239,7 +242,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
239 } 242 }
240 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY; 243 Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
241 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version()); 244 Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
242 - return client.submit(new UpdateAndGet(key, 245 + return submit(new UpdateAndGet(key,
243 computedValue.get(), 246 computedValue.get(),
244 valueMatch, 247 valueMatch,
245 versionMatch)) 248 versionMatch))
...@@ -252,7 +255,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -252,7 +255,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
252 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener, 255 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
253 Executor executor) { 256 Executor executor) {
254 if (mapEventListeners.isEmpty()) { 257 if (mapEventListeners.isEmpty()) {
255 - return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor)); 258 + return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
256 } else { 259 } else {
257 mapEventListeners.put(listener, executor); 260 mapEventListeners.put(listener, executor);
258 return CompletableFuture.completedFuture(null); 261 return CompletableFuture.completedFuture(null);
...@@ -262,7 +265,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -262,7 +265,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
262 @Override 265 @Override
263 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) { 266 public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
264 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) { 267 if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
265 - return client.submit(new Unlisten()).thenApply(v -> null); 268 + return submit(new Unlisten()).thenApply(v -> null);
266 } 269 }
267 return CompletableFuture.completedFuture(null); 270 return CompletableFuture.completedFuture(null);
268 } 271 }
...@@ -275,23 +278,23 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -275,23 +278,23 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
275 278
276 @Override 279 @Override
277 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) { 280 public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
278 - return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK); 281 + return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
279 } 282 }
280 283
281 @Override 284 @Override
282 public CompletableFuture<Void> commit(TransactionId transactionId) { 285 public CompletableFuture<Void> commit(TransactionId transactionId) {
283 - return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null); 286 + return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
284 } 287 }
285 288
286 @Override 289 @Override
287 public CompletableFuture<Void> rollback(TransactionId transactionId) { 290 public CompletableFuture<Void> rollback(TransactionId transactionId) {
288 - return client.submit(new TransactionRollback(transactionId)) 291 + return submit(new TransactionRollback(transactionId))
289 .thenApply(v -> null); 292 .thenApply(v -> null);
290 } 293 }
291 294
292 @Override 295 @Override
293 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) { 296 public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
294 - return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK); 297 + return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
295 } 298 }
296 299
297 @Override 300 @Override
...@@ -308,4 +311,11 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -308,4 +311,11 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
308 public Collection<Consumer<Status>> statusChangeListeners() { 311 public Collection<Consumer<Status>> statusChangeListeners() {
309 return ImmutableSet.copyOf(statusChangeListeners); 312 return ImmutableSet.copyOf(statusChangeListeners);
310 } 313 }
314 +
315 + <T> CompletableFuture<T> submit(Operation<T> command) {
316 + if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
317 + return Tools.exceptionalFuture(new StorageException.Unavailable());
318 + }
319 + return client.submit(command);
320 + }
311 } 321 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.primitives.resources.impl; 16 package org.onosproject.store.primitives.resources.impl;
17 17
18 +import io.atomix.copycat.Operation;
18 import io.atomix.copycat.client.CopycatClient; 19 import io.atomix.copycat.client.CopycatClient;
19 import io.atomix.resource.AbstractResource; 20 import io.atomix.resource.AbstractResource;
20 import io.atomix.resource.ResourceTypeInfo; 21 import io.atomix.resource.ResourceTypeInfo;
...@@ -27,6 +28,7 @@ import java.util.Set; ...@@ -27,6 +28,7 @@ import java.util.Set;
27 import java.util.concurrent.CompletableFuture; 28 import java.util.concurrent.CompletableFuture;
28 import java.util.function.Consumer; 29 import java.util.function.Consumer;
29 30
31 +import org.onlab.util.Tools;
30 import org.onosproject.cluster.Leadership; 32 import org.onosproject.cluster.Leadership;
31 import org.onosproject.cluster.NodeId; 33 import org.onosproject.cluster.NodeId;
32 import org.onosproject.event.Change; 34 import org.onosproject.event.Change;
...@@ -40,6 +42,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorComman ...@@ -40,6 +42,7 @@ import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorComman
40 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten; 42 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
41 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw; 43 import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
42 import org.onosproject.store.service.AsyncLeaderElector; 44 import org.onosproject.store.service.AsyncLeaderElector;
45 +import org.onosproject.store.service.StorageException;
43 46
44 import com.google.common.collect.ImmutableSet; 47 import com.google.common.collect.ImmutableSet;
45 import com.google.common.cache.CacheBuilder; 48 import com.google.common.cache.CacheBuilder;
...@@ -67,7 +70,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -67,7 +70,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
67 super(client, properties); 70 super(client, properties);
68 cache = CacheBuilder.newBuilder() 71 cache = CacheBuilder.newBuilder()
69 .maximumSize(1000) 72 .maximumSize(1000)
70 - .build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic)))); 73 + .build(CacheLoader.from(topic -> submit(new GetLeadership(topic))));
71 74
72 cacheUpdater = change -> { 75 cacheUpdater = change -> {
73 Leadership leadership = change.newValue(); 76 Leadership leadership = change.newValue();
...@@ -110,27 +113,27 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -110,27 +113,27 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
110 113
111 @Override 114 @Override
112 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) { 115 public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
113 - return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); 116 + return submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
114 } 117 }
115 118
116 @Override 119 @Override
117 public CompletableFuture<Void> withdraw(String topic) { 120 public CompletableFuture<Void> withdraw(String topic) {
118 - return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic)); 121 + return submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
119 } 122 }
120 123
121 @Override 124 @Override
122 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) { 125 public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
123 - return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); 126 + return submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
124 } 127 }
125 128
126 @Override 129 @Override
127 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) { 130 public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
128 - return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic)); 131 + return submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
129 } 132 }
130 133
131 @Override 134 @Override
132 public CompletableFuture<Void> evict(NodeId nodeId) { 135 public CompletableFuture<Void> evict(NodeId nodeId) {
133 - return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId)); 136 + return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
134 } 137 }
135 138
136 @Override 139 @Override
...@@ -145,17 +148,17 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -145,17 +148,17 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
145 148
146 @Override 149 @Override
147 public CompletableFuture<Map<String, Leadership>> getLeaderships() { 150 public CompletableFuture<Map<String, Leadership>> getLeaderships() {
148 - return client.submit(new GetAllLeaderships()); 151 + return submit(new GetAllLeaderships());
149 } 152 }
150 153
151 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) { 154 public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
152 - return client.submit(new GetElectedTopics(nodeId)); 155 + return submit(new GetElectedTopics(nodeId));
153 } 156 }
154 157
155 @Override 158 @Override
156 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) { 159 public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
157 if (leadershipChangeListeners.isEmpty()) { 160 if (leadershipChangeListeners.isEmpty()) {
158 - return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer)); 161 + return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
159 } else { 162 } else {
160 leadershipChangeListeners.add(consumer); 163 leadershipChangeListeners.add(consumer);
161 return CompletableFuture.completedFuture(null); 164 return CompletableFuture.completedFuture(null);
...@@ -165,7 +168,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -165,7 +168,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
165 @Override 168 @Override
166 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) { 169 public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
167 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) { 170 if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
168 - return client.submit(new Unlisten()).thenApply(v -> null); 171 + return submit(new Unlisten()).thenApply(v -> null);
169 } 172 }
170 return CompletableFuture.completedFuture(null); 173 return CompletableFuture.completedFuture(null);
171 } 174 }
...@@ -184,4 +187,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector> ...@@ -184,4 +187,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
184 public Collection<Consumer<Status>> statusChangeListeners() { 187 public Collection<Consumer<Status>> statusChangeListeners() {
185 return ImmutableSet.copyOf(statusChangeListeners); 188 return ImmutableSet.copyOf(statusChangeListeners);
186 } 189 }
190 +
191 + <T> CompletableFuture<T> submit(Operation<T> command) {
192 + if (client.state() == CopycatClient.State.SUSPENDED || client.state() == CopycatClient.State.CLOSED) {
193 + return Tools.exceptionalFuture(new StorageException.Unavailable());
194 + }
195 + return client.submit(command);
196 + }
187 } 197 }
......