Madan Jampani
Committed by Gerrit Code Review

Support for retries in AtomicCounter. Enabled counter operation retries in Consi…

…stentApplicationIdStore

Change-Id: I705c51b2efd7ecd928c64c7f8a16d1965198253c
1 package org.onosproject.store.service; 1 package org.onosproject.store.service;
2 2
3 +import java.util.concurrent.ScheduledExecutorService;
4 +
3 /** 5 /**
4 * Builder for AtomicCounter. 6 * Builder for AtomicCounter.
5 */ 7 */
...@@ -33,6 +35,24 @@ public interface AtomicCounterBuilder { ...@@ -33,6 +35,24 @@ public interface AtomicCounterBuilder {
33 AtomicCounterBuilder withPartitionsDisabled(); 35 AtomicCounterBuilder withPartitionsDisabled();
34 36
35 /** 37 /**
38 + * Enables retries when counter operations fail.
39 + * <p>
40 + * Note: Use with caution. By default retries are disabled.
41 + * </p>
42 + * @return this AtomicCounterBuilder
43 + */
44 + AtomicCounterBuilder withRetryOnFailure();
45 +
46 + /**
47 + * Sets the executor service to use for retrying failed operations.
48 + * <p>
49 + * Note: Must be set when retries are enabled
50 + * </p>
51 + * @return this AtomicCounterBuilder
52 + */
53 + AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor);
54 +
55 + /**
36 * Builds a AtomicCounter based on the configuration options 56 * Builds a AtomicCounter based on the configuration options
37 * supplied to this builder. 57 * supplied to this builder.
38 * 58 *
......
...@@ -16,10 +16,20 @@ ...@@ -16,10 +16,20 @@
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 import java.util.concurrent.CompletableFuture; 18 import java.util.concurrent.CompletableFuture;
19 +import java.util.concurrent.ScheduledExecutorService;
20 +import java.util.concurrent.TimeUnit;
21 +import java.util.function.BiFunction;
19 22
20 import org.onosproject.store.service.AsyncAtomicCounter; 23 import org.onosproject.store.service.AsyncAtomicCounter;
21 24
25 +
26 +
27 +
28 +
29 +import org.slf4j.Logger;
30 +
22 import static com.google.common.base.Preconditions.*; 31 import static com.google.common.base.Preconditions.*;
32 +import static org.slf4j.LoggerFactory.getLogger;
23 33
24 /** 34 /**
25 * Default implementation for a distributed AsyncAtomicCounter backed by 35 * Default implementation for a distributed AsyncAtomicCounter backed by
...@@ -31,10 +41,20 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -31,10 +41,20 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
31 41
32 private final String name; 42 private final String name;
33 private final Database database; 43 private final Database database;
44 + private final boolean retryOnFailure;
45 + private final ScheduledExecutorService retryExecutor;
46 + // TODO: configure delay via builder
47 + private static final int DELAY_BETWEEN_RETRY_SEC = 1;
48 + private final Logger log = getLogger(getClass());
34 49
35 - public DefaultAsyncAtomicCounter(String name, Database database) { 50 + public DefaultAsyncAtomicCounter(String name,
51 + Database database,
52 + boolean retryOnException,
53 + ScheduledExecutorService retryExecutor) {
36 this.name = checkNotNull(name); 54 this.name = checkNotNull(name);
37 this.database = checkNotNull(database); 55 this.database = checkNotNull(database);
56 + this.retryOnFailure = retryOnException;
57 + this.retryExecutor = retryExecutor;
38 } 58 }
39 59
40 @Override 60 @Override
...@@ -54,11 +74,70 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -54,11 +74,70 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
54 74
55 @Override 75 @Override
56 public CompletableFuture<Long> getAndAdd(long delta) { 76 public CompletableFuture<Long> getAndAdd(long delta) {
57 - return database.counterGetAndAdd(name, delta); 77 + CompletableFuture<Long> result = database.counterGetAndAdd(name, delta);
78 + if (!retryOnFailure) {
79 + return result;
80 + }
81 +
82 + CompletableFuture<Long> future = new CompletableFuture<>();
83 + return result.whenComplete((r, e) -> {
84 + if (e != null) {
85 + log.warn("getAndAdd failed due to {}. Will retry", e.getMessage());
86 + retryExecutor.schedule(new RetryTask(database::counterGetAndAdd, delta, future),
87 + DELAY_BETWEEN_RETRY_SEC,
88 + TimeUnit.SECONDS);
89 + } else {
90 + future.complete(r);
91 + }
92 + }).thenCompose(v -> future);
58 } 93 }
59 94
60 @Override 95 @Override
61 public CompletableFuture<Long> addAndGet(long delta) { 96 public CompletableFuture<Long> addAndGet(long delta) {
62 - return database.counterAddAndGet(name, delta); 97 + CompletableFuture<Long> result = database.counterAddAndGet(name, delta);
98 + if (!retryOnFailure) {
99 + return result;
100 + }
101 +
102 + CompletableFuture<Long> future = new CompletableFuture<>();
103 + return result.whenComplete((r, e) -> {
104 + if (e != null) {
105 + log.warn("addAndGet failed due to {}. Will retry", e.getMessage());
106 + retryExecutor.schedule(new RetryTask(database::counterAddAndGet, delta, future),
107 + DELAY_BETWEEN_RETRY_SEC,
108 + TimeUnit.SECONDS);
109 + } else {
110 + future.complete(r);
111 + }
112 + }).thenCompose(v -> future);
113 + }
114 +
115 + private class RetryTask implements Runnable {
116 +
117 + private final BiFunction<String, Long, CompletableFuture<Long>> function;
118 + private final Long delta;
119 + private final CompletableFuture<Long> result;
120 +
121 + public RetryTask(BiFunction<String, Long, CompletableFuture<Long>> function,
122 + Long delta,
123 + CompletableFuture<Long> result) {
124 + this.function = function;
125 + this.delta = delta;
126 + this.result = result;
127 + }
128 +
129 + @Override
130 + public void run() {
131 + function.apply(name, delta).whenComplete((r, e) -> {
132 + if (e == null) {
133 + result.complete(r);
134 + } else {
135 + log.warn("{} retry failed due to {}. Will try again...", function, e.getMessage());
136 + // TODO: Exponential backoff
137 + // TODO: limit retries
138 + retryExecutor.schedule(this, DELAY_BETWEEN_RETRY_SEC, TimeUnit.SECONDS);
139 + }
140 + });
141 + }
63 } 142 }
64 } 143 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -17,6 +17,7 @@ package org.onosproject.store.consistent.impl; ...@@ -17,6 +17,7 @@ package org.onosproject.store.consistent.impl;
17 17
18 import java.util.concurrent.CompletableFuture; 18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.ExecutionException; 19 import java.util.concurrent.ExecutionException;
20 +import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.TimeUnit; 21 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException; 22 import java.util.concurrent.TimeoutException;
22 23
...@@ -36,8 +37,11 @@ public class DefaultAtomicCounter implements AtomicCounter { ...@@ -36,8 +37,11 @@ public class DefaultAtomicCounter implements AtomicCounter {
36 37
37 private final AsyncAtomicCounter asyncCounter; 38 private final AsyncAtomicCounter asyncCounter;
38 39
39 - public DefaultAtomicCounter(String name, Database database) { 40 + public DefaultAtomicCounter(String name,
40 - asyncCounter = new DefaultAsyncAtomicCounter(name, database); 41 + Database database,
42 + boolean retryOnException,
43 + ScheduledExecutorService retryExecutor) {
44 + asyncCounter = new DefaultAsyncAtomicCounter(name, database, retryOnException, retryExecutor);
41 } 45 }
42 46
43 @Override 47 @Override
......
1 package org.onosproject.store.consistent.impl; 1 package org.onosproject.store.consistent.impl;
2 2
3 +import java.util.concurrent.ScheduledExecutorService;
4 +
3 import org.onosproject.store.service.AsyncAtomicCounter; 5 import org.onosproject.store.service.AsyncAtomicCounter;
4 import org.onosproject.store.service.AtomicCounter; 6 import org.onosproject.store.service.AtomicCounter;
5 import org.onosproject.store.service.AtomicCounterBuilder; 7 import org.onosproject.store.service.AtomicCounterBuilder;
...@@ -15,6 +17,8 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { ...@@ -15,6 +17,8 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
15 private boolean partitionsEnabled = true; 17 private boolean partitionsEnabled = true;
16 private final Database partitionedDatabase; 18 private final Database partitionedDatabase;
17 private final Database inMemoryDatabase; 19 private final Database inMemoryDatabase;
20 + private boolean retryOnFailure = false;
21 + private ScheduledExecutorService retryExecutor = null;
18 22
19 public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) { 23 public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
20 this.inMemoryDatabase = inMemoryDatabase; 24 this.inMemoryDatabase = inMemoryDatabase;
...@@ -36,13 +40,35 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder { ...@@ -36,13 +40,35 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
36 40
37 @Override 41 @Override
38 public AtomicCounter build() { 42 public AtomicCounter build() {
43 + validateInputs();
39 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; 44 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
40 - return new DefaultAtomicCounter(name, database); 45 + return new DefaultAtomicCounter(name, database, retryOnFailure, retryExecutor);
41 } 46 }
42 47
43 @Override 48 @Override
44 public AsyncAtomicCounter buildAsyncCounter() { 49 public AsyncAtomicCounter buildAsyncCounter() {
50 + validateInputs();
45 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase; 51 Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
46 - return new DefaultAsyncAtomicCounter(name, database); 52 + return new DefaultAsyncAtomicCounter(name, database, retryOnFailure, retryExecutor);
53 + }
54 +
55 + @Override
56 + public AtomicCounterBuilder withRetryOnFailure() {
57 + retryOnFailure = true;
58 + return this;
59 + }
60 +
61 + @Override
62 + public AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor) {
63 + this.retryExecutor = executor;
64 + return this;
65 + }
66 +
67 + private void validateInputs() {
68 + if (retryOnFailure) {
69 + if (retryExecutor == null) {
70 + throw new IllegalArgumentException("RetryExecutor must be specified when retries are enabled");
71 + }
72 + }
47 } 73 }
48 } 74 }
......
...@@ -15,10 +15,14 @@ ...@@ -15,10 +15,14 @@
15 */ 15 */
16 package org.onosproject.store.core.impl; 16 package org.onosproject.store.core.impl;
17 17
18 +import static org.onlab.util.Tools.groupedThreads;
18 import static org.slf4j.LoggerFactory.getLogger; 19 import static org.slf4j.LoggerFactory.getLogger;
19 20
20 import java.util.Map; 21 import java.util.Map;
21 import java.util.Set; 22 import java.util.Set;
23 +import java.util.concurrent.Executors;
24 +import java.util.concurrent.ScheduledExecutorService;
25 +
22 import org.apache.felix.scr.annotations.Activate; 26 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component; 27 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate; 28 import org.apache.felix.scr.annotations.Deactivate;
...@@ -30,7 +34,7 @@ import org.onosproject.core.ApplicationId; ...@@ -30,7 +34,7 @@ import org.onosproject.core.ApplicationId;
30 import org.onosproject.core.ApplicationIdStore; 34 import org.onosproject.core.ApplicationIdStore;
31 import org.onosproject.core.DefaultApplicationId; 35 import org.onosproject.core.DefaultApplicationId;
32 import org.onosproject.store.serializers.KryoNamespaces; 36 import org.onosproject.store.serializers.KryoNamespaces;
33 -import org.onosproject.store.service.AtomicCounter; 37 +import org.onosproject.store.service.AsyncAtomicCounter;
34 import org.onosproject.store.service.ConsistentMap; 38 import org.onosproject.store.service.ConsistentMap;
35 import org.onosproject.store.service.Serializer; 39 import org.onosproject.store.service.Serializer;
36 import org.onosproject.store.service.StorageService; 40 import org.onosproject.store.service.StorageService;
...@@ -39,6 +43,7 @@ import org.slf4j.Logger; ...@@ -39,6 +43,7 @@ import org.slf4j.Logger;
39 43
40 import com.google.common.collect.ImmutableSet; 44 import com.google.common.collect.ImmutableSet;
41 import com.google.common.collect.Maps; 45 import com.google.common.collect.Maps;
46 +import com.google.common.util.concurrent.Futures;
42 47
43 /** 48 /**
44 * ApplicationIdStore implementation on top of {@code AtomicCounter} 49 * ApplicationIdStore implementation on top of {@code AtomicCounter}
...@@ -53,10 +58,11 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -53,10 +58,11 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
53 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
54 protected StorageService storageService; 59 protected StorageService storageService;
55 60
56 - private AtomicCounter appIdCounter; 61 + private AsyncAtomicCounter appIdCounter;
57 private ConsistentMap<String, ApplicationId> registeredIds; 62 private ConsistentMap<String, ApplicationId> registeredIds;
58 private Map<String, ApplicationId> nameToAppIdCache = Maps.newConcurrentMap(); 63 private Map<String, ApplicationId> nameToAppIdCache = Maps.newConcurrentMap();
59 private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap(); 64 private Map<Short, ApplicationId> idToAppIdCache = Maps.newConcurrentMap();
65 + private ScheduledExecutorService executor;
60 66
61 private static final Serializer SERIALIZER = Serializer.using(new KryoNamespace.Builder() 67 private static final Serializer SERIALIZER = Serializer.using(new KryoNamespace.Builder()
62 .register(KryoNamespaces.API) 68 .register(KryoNamespaces.API)
...@@ -65,10 +71,13 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -65,10 +71,13 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
65 71
66 @Activate 72 @Activate
67 public void activate() { 73 public void activate() {
74 + executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/store/appId", "retry-handler"));
68 appIdCounter = storageService.atomicCounterBuilder() 75 appIdCounter = storageService.atomicCounterBuilder()
69 .withName("onos-app-id-counter") 76 .withName("onos-app-id-counter")
70 .withPartitionsDisabled() 77 .withPartitionsDisabled()
71 - .build(); 78 + .withRetryOnFailure()
79 + .withRetryExecutor(executor)
80 + .buildAsyncCounter();
72 81
73 registeredIds = storageService.<String, ApplicationId>consistentMapBuilder() 82 registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
74 .withName("onos-app-ids") 83 .withName("onos-app-ids")
...@@ -83,6 +92,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -83,6 +92,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
83 92
84 @Deactivate 93 @Deactivate
85 public void deactivate() { 94 public void deactivate() {
95 + executor.shutdown();
86 log.info("Stopped"); 96 log.info("Stopped");
87 } 97 }
88 98
...@@ -118,7 +128,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore { ...@@ -118,7 +128,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
118 ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> { 128 ApplicationId appId = nameToAppIdCache.computeIfAbsent(name, key -> {
119 Versioned<ApplicationId> existingAppId = registeredIds.get(name); 129 Versioned<ApplicationId> existingAppId = registeredIds.get(name);
120 if (existingAppId == null) { 130 if (existingAppId == null) {
121 - int id = (int) appIdCounter.incrementAndGet(); 131 + int id = Futures.getUnchecked(appIdCounter.incrementAndGet()).intValue();
122 DefaultApplicationId newAppId = new DefaultApplicationId(id, name); 132 DefaultApplicationId newAppId = new DefaultApplicationId(id, name);
123 existingAppId = registeredIds.putIfAbsent(name, newAppId); 133 existingAppId = registeredIds.putIfAbsent(name, newAppId);
124 if (existingAppId != null) { 134 if (existingAppId != null) {
......