andrea
Committed by Gerrit Code Review

minor name changes and javadoc

Change-Id: I43a05d75392efad9ac004867027a31adcc18c6f5
...@@ -58,4 +58,12 @@ public interface AsyncAtomicCounter { ...@@ -58,4 +58,12 @@ public interface AsyncAtomicCounter {
58 * @return current value 58 * @return current value
59 */ 59 */
60 CompletableFuture<Long> get(); 60 CompletableFuture<Long> get();
61 +
62 +
63 + /**
64 + * Atomically sets the given value to the current value.
65 + *
66 + * @return future void
67 + */
68 + CompletableFuture<Void> set(long value);
61 } 69 }
......
...@@ -51,6 +51,14 @@ public interface AtomicCounter { ...@@ -51,6 +51,14 @@ public interface AtomicCounter {
51 long addAndGet(long delta); 51 long addAndGet(long delta);
52 52
53 /** 53 /**
54 + * Atomically sets the given value to the current value.
55 + *
56 + * @param value the value to set
57 + */
58 + void set(long value);
59 +
60 +
61 + /**
54 * Returns the current value of the counter without modifying it. 62 * Returns the current value of the counter without modifying it.
55 * 63 *
56 * @return current value 64 * @return current value
......
...@@ -48,6 +48,11 @@ public final class TestAtomicCounter implements AtomicCounter { ...@@ -48,6 +48,11 @@ public final class TestAtomicCounter implements AtomicCounter {
48 } 48 }
49 49
50 @Override 50 @Override
51 + public void set(long value) {
52 + this.value.set(value);
53 + }
54 +
55 + @Override
51 public long get() { 56 public long get() {
52 return value.get(); 57 return value.get();
53 } 58 }
......
...@@ -45,6 +45,7 @@ public interface DatabaseProxy<K, V> { ...@@ -45,6 +45,7 @@ public interface DatabaseProxy<K, V> {
45 45
46 /** 46 /**
47 * Returns the number of entries in map. 47 * Returns the number of entries in map.
48 + *
48 * @param mapName map name 49 * @param mapName map name
49 * @return A completable future to be completed with the result once complete. 50 * @return A completable future to be completed with the result once complete.
50 */ 51 */
...@@ -62,7 +63,7 @@ public interface DatabaseProxy<K, V> { ...@@ -62,7 +63,7 @@ public interface DatabaseProxy<K, V> {
62 * Checks whether the map contains a key. 63 * Checks whether the map contains a key.
63 * 64 *
64 * @param mapName map name 65 * @param mapName map name
65 - * @param key key to check. 66 + * @param key key to check.
66 * @return A completable future to be completed with the result once complete. 67 * @return A completable future to be completed with the result once complete.
67 */ 68 */
68 CompletableFuture<Boolean> mapContainsKey(String mapName, K key); 69 CompletableFuture<Boolean> mapContainsKey(String mapName, K key);
...@@ -71,7 +72,7 @@ public interface DatabaseProxy<K, V> { ...@@ -71,7 +72,7 @@ public interface DatabaseProxy<K, V> {
71 * Checks whether the map contains a value. 72 * Checks whether the map contains a value.
72 * 73 *
73 * @param mapName map name 74 * @param mapName map name
74 - * @param value The value to check. 75 + * @param value The value to check.
75 * @return A completable future to be completed with the result once complete. 76 * @return A completable future to be completed with the result once complete.
76 */ 77 */
77 CompletableFuture<Boolean> mapContainsValue(String mapName, V value); 78 CompletableFuture<Boolean> mapContainsValue(String mapName, V value);
...@@ -80,7 +81,7 @@ public interface DatabaseProxy<K, V> { ...@@ -80,7 +81,7 @@ public interface DatabaseProxy<K, V> {
80 * Gets a value from the map. 81 * Gets a value from the map.
81 * 82 *
82 * @param mapName map name 83 * @param mapName map name
83 - * @param key The key to get. 84 + * @param key The key to get.
84 * @return A completable future to be completed with the result once complete. 85 * @return A completable future to be completed with the result once complete.
85 */ 86 */
86 CompletableFuture<Versioned<V>> mapGet(String mapName, K key); 87 CompletableFuture<Versioned<V>> mapGet(String mapName, K key);
...@@ -88,11 +89,11 @@ public interface DatabaseProxy<K, V> { ...@@ -88,11 +89,11 @@ public interface DatabaseProxy<K, V> {
88 /** 89 /**
89 * Updates the map. 90 * Updates the map.
90 * 91 *
91 - * @param mapName map name 92 + * @param mapName map name
92 - * @param key The key to set 93 + * @param key The key to set
93 - * @param valueMatch match for checking existing value 94 + * @param valueMatch match for checking existing value
94 - * @param versionMatch match for checking existing version 95 + * @param versionMatch match for checking existing version
95 - * @param value new value 96 + * @param value new value
96 * @return A completable future to be completed with the result once complete 97 * @return A completable future to be completed with the result once complete
97 */ 98 */
98 CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate( 99 CompletableFuture<Result<UpdateResult<K, V>>> mapUpdate(
...@@ -130,11 +131,11 @@ public interface DatabaseProxy<K, V> { ...@@ -130,11 +131,11 @@ public interface DatabaseProxy<K, V> {
130 */ 131 */
131 CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName); 132 CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> mapEntrySet(String mapName);
132 133
133 - /** 134 + /**
134 * Atomically add the given value to current value of the specified counter. 135 * Atomically add the given value to current value of the specified counter.
135 * 136 *
136 * @param counterName counter name 137 * @param counterName counter name
137 - * @param delta value to add 138 + * @param delta value to add
138 * @return updated value 139 * @return updated value
139 */ 140 */
140 CompletableFuture<Long> counterAddAndGet(String counterName, long delta); 141 CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
...@@ -143,11 +144,21 @@ public interface DatabaseProxy<K, V> { ...@@ -143,11 +144,21 @@ public interface DatabaseProxy<K, V> {
143 * Atomically add the given value to current value of the specified counter. 144 * Atomically add the given value to current value of the specified counter.
144 * 145 *
145 * @param counterName counter name 146 * @param counterName counter name
146 - * @param delta value to add 147 + * @param delta value to add
147 * @return previous value 148 * @return previous value
148 */ 149 */
149 CompletableFuture<Long> counterGetAndAdd(String counterName, long delta); 150 CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
150 151
152 +
153 + /**
154 + * Atomically sets the given value to current value of the specified counter.
155 + *
156 + * @param counterName counter name
157 + * @param value value to set
158 + * @return void future
159 + */
160 + CompletableFuture<Void> counterSet(String counterName, long value);
161 +
151 /** 162 /**
152 * Returns the current value of the specified atomic counter. 163 * Returns the current value of the specified atomic counter.
153 * 164 *
...@@ -158,6 +169,7 @@ public interface DatabaseProxy<K, V> { ...@@ -158,6 +169,7 @@ public interface DatabaseProxy<K, V> {
158 169
159 /** 170 /**
160 * Returns the size of queue. 171 * Returns the size of queue.
172 + *
161 * @param queueName queue name 173 * @param queueName queue name
162 * @return queue size 174 * @return queue size
163 */ 175 */
...@@ -165,14 +177,16 @@ public interface DatabaseProxy<K, V> { ...@@ -165,14 +177,16 @@ public interface DatabaseProxy<K, V> {
165 177
166 /** 178 /**
167 * Inserts an entry into the queue. 179 * Inserts an entry into the queue.
180 + *
168 * @param queueName queue name 181 * @param queueName queue name
169 - * @param entry queue entry 182 + * @param entry queue entry
170 * @return void future 183 * @return void future
171 */ 184 */
172 CompletableFuture<Void> queuePush(String queueName, byte[] entry); 185 CompletableFuture<Void> queuePush(String queueName, byte[] entry);
173 186
174 /** 187 /**
175 * Removes an entry from the queue if the queue is non-empty. 188 * Removes an entry from the queue if the queue is non-empty.
189 + *
176 * @param queueName queue name 190 * @param queueName queue name
177 * @return entry future. Can be completed with null if queue is empty 191 * @return entry future. Can be completed with null if queue is empty
178 */ 192 */
...@@ -180,6 +194,7 @@ public interface DatabaseProxy<K, V> { ...@@ -180,6 +194,7 @@ public interface DatabaseProxy<K, V> {
180 194
181 /** 195 /**
182 * Returns but does not remove an entry from the queue. 196 * Returns but does not remove an entry from the queue.
197 + *
183 * @param queueName queue name 198 * @param queueName queue name
184 * @return entry. Can be null if queue is empty 199 * @return entry. Can be null if queue is empty
185 */ 200 */
......
...@@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl; ...@@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl;
18 import org.onosproject.store.service.AsyncAtomicCounter; 18 import org.onosproject.store.service.AsyncAtomicCounter;
19 19
20 import java.util.concurrent.CompletableFuture; 20 import java.util.concurrent.CompletableFuture;
21 +
21 import static com.google.common.base.Preconditions.checkNotNull; 22 import static com.google.common.base.Preconditions.checkNotNull;
22 23
23 /** 24 /**
...@@ -38,6 +39,7 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -38,6 +39,7 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
38 private static final String GET_AND_ADD = "getAndAdd"; 39 private static final String GET_AND_ADD = "getAndAdd";
39 private static final String ADD_AND_GET = "addAndGet"; 40 private static final String ADD_AND_GET = "addAndGet";
40 private static final String GET = "get"; 41 private static final String GET = "get";
42 + private static final String SET = "set";
41 43
42 public DefaultAsyncAtomicCounter(String name, 44 public DefaultAsyncAtomicCounter(String name,
43 Database database, 45 Database database,
...@@ -72,13 +74,20 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -72,13 +74,20 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
72 public CompletableFuture<Long> getAndAdd(long delta) { 74 public CompletableFuture<Long> getAndAdd(long delta) {
73 final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD); 75 final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
74 return database.counterGetAndAdd(name, delta) 76 return database.counterGetAndAdd(name, delta)
75 - .whenComplete((r, e) -> timer.stop(e)); 77 + .whenComplete((r, e) -> timer.stop(e));
76 } 78 }
77 79
78 @Override 80 @Override
79 public CompletableFuture<Long> addAndGet(long delta) { 81 public CompletableFuture<Long> addAndGet(long delta) {
80 final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET); 82 final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
81 return database.counterAddAndGet(name, delta) 83 return database.counterAddAndGet(name, delta)
82 - .whenComplete((r, e) -> timer.stop(e)); 84 + .whenComplete((r, e) -> timer.stop(e));
85 + }
86 +
87 + @Override
88 + public CompletableFuture<Void> set(long value) {
89 + final MeteringAgent.Context timer = monitor.startTimer(SET);
90 + return database.counterSet(name, value)
91 + .whenComplete((r, e) -> timer.stop(e));
83 } 92 }
84 } 93 }
......
...@@ -63,6 +63,11 @@ public class DefaultAtomicCounter implements AtomicCounter { ...@@ -63,6 +63,11 @@ public class DefaultAtomicCounter implements AtomicCounter {
63 } 63 }
64 64
65 @Override 65 @Override
66 + public void set(long value) {
67 + complete(asyncCounter.set(value));
68 + }
69 +
70 + @Override
66 public long get() { 71 public long get() {
67 return complete(asyncCounter.get()); 72 return complete(asyncCounter.get());
68 } 73 }
......
...@@ -44,13 +44,13 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -44,13 +44,13 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
44 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet(); 44 private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
45 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher(); 45 private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
46 46
47 - @SuppressWarnings({ "unchecked", "rawtypes" }) 47 + @SuppressWarnings({"unchecked", "rawtypes"})
48 public DefaultDatabase(ResourceManager context) { 48 public DefaultDatabase(ResourceManager context) {
49 super(context); 49 super(context);
50 this.stateMachine = new DefaultStateMachine(context, 50 this.stateMachine = new DefaultStateMachine(context,
51 - DatabaseState.class, 51 + DatabaseState.class,
52 - DefaultDatabaseState.class, 52 + DefaultDatabaseState.class,
53 - DefaultDatabase.class.getClassLoader()); 53 + DefaultDatabase.class.getClassLoader());
54 this.stateMachine.addStartupTask(() -> { 54 this.stateMachine.addStartupTask(() -> {
55 stateMachine.registerWatcher(watcher); 55 stateMachine.registerWatcher(watcher);
56 return CompletableFuture.completedFuture(null); 56 return CompletableFuture.completedFuture(null);
...@@ -66,7 +66,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -66,7 +66,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
66 * return the completed future result. 66 * return the completed future result.
67 * 67 *
68 * @param supplier The supplier to call if the database is open. 68 * @param supplier The supplier to call if the database is open.
69 - * @param <T> The future result type. 69 + * @param <T> The future result type.
70 * @return A completable future that if this database is closed is immediately failed. 70 * @return A completable future that if this database is closed is immediately failed.
71 */ 71 */
72 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) { 72 protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
...@@ -153,6 +153,11 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -153,6 +153,11 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
153 } 153 }
154 154
155 @Override 155 @Override
156 + public CompletableFuture<Void> counterSet(String counterName, long value) {
157 + return checkOpen(() -> proxy.counterSet(counterName, value));
158 + }
159 +
160 + @Override
156 public CompletableFuture<Long> queueSize(String queueName) { 161 public CompletableFuture<Long> queueSize(String queueName) {
157 return checkOpen(() -> proxy.queueSize(queueName)); 162 return checkOpen(() -> proxy.queueSize(queueName));
158 } 163 }
......
...@@ -100,10 +100,10 @@ public class PartitionedDatabase implements Database { ...@@ -100,10 +100,10 @@ public class PartitionedDatabase implements Database {
100 return CompletableFuture.allOf(partitions 100 return CompletableFuture.allOf(partitions
101 .stream() 101 .stream()
102 .map(db -> db.counters() 102 .map(db -> db.counters()
103 - .thenApply(m -> { 103 + .thenApply(m -> {
104 - counters.putAll(m); 104 + counters.putAll(m);
105 - return null; 105 + return null;
106 - })) 106 + }))
107 .toArray(CompletableFuture[]::new)) 107 .toArray(CompletableFuture[]::new))
108 .thenApply(v -> counters); 108 .thenApply(v -> counters);
109 } 109 }
...@@ -113,9 +113,9 @@ public class PartitionedDatabase implements Database { ...@@ -113,9 +113,9 @@ public class PartitionedDatabase implements Database {
113 checkState(isOpen.get(), DB_NOT_OPEN); 113 checkState(isOpen.get(), DB_NOT_OPEN);
114 AtomicInteger totalSize = new AtomicInteger(0); 114 AtomicInteger totalSize = new AtomicInteger(0);
115 return CompletableFuture.allOf(partitions 115 return CompletableFuture.allOf(partitions
116 - .stream() 116 + .stream()
117 - .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet)) 117 + .map(p -> p.mapSize(mapName).thenApply(totalSize::addAndGet))
118 - .toArray(CompletableFuture[]::new)) 118 + .toArray(CompletableFuture[]::new))
119 .thenApply(v -> totalSize.get()); 119 .thenApply(v -> totalSize.get());
120 } 120 }
121 121
...@@ -136,10 +136,10 @@ public class PartitionedDatabase implements Database { ...@@ -136,10 +136,10 @@ public class PartitionedDatabase implements Database {
136 checkState(isOpen.get(), DB_NOT_OPEN); 136 checkState(isOpen.get(), DB_NOT_OPEN);
137 AtomicBoolean containsValue = new AtomicBoolean(false); 137 AtomicBoolean containsValue = new AtomicBoolean(false);
138 return CompletableFuture.allOf(partitions 138 return CompletableFuture.allOf(partitions
139 - .stream() 139 + .stream()
140 - .map(p -> p.mapContainsValue(mapName, value) 140 + .map(p -> p.mapContainsValue(mapName, value)
141 - .thenApply(v -> containsValue.compareAndSet(false, v))) 141 + .thenApply(v -> containsValue.compareAndSet(false, v)))
142 - .toArray(CompletableFuture[]::new)) 142 + .toArray(CompletableFuture[]::new))
143 .thenApply(v -> containsValue.get()); 143 .thenApply(v -> containsValue.get());
144 } 144 }
145 145
...@@ -196,9 +196,9 @@ public class PartitionedDatabase implements Database { ...@@ -196,9 +196,9 @@ public class PartitionedDatabase implements Database {
196 checkState(isOpen.get(), DB_NOT_OPEN); 196 checkState(isOpen.get(), DB_NOT_OPEN);
197 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet(); 197 Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
198 return CompletableFuture.allOf(partitions 198 return CompletableFuture.allOf(partitions
199 - .stream() 199 + .stream()
200 - .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll)) 200 + .map(p -> p.mapEntrySet(mapName).thenApply(entrySet::addAll))
201 - .toArray(CompletableFuture[]::new)) 201 + .toArray(CompletableFuture[]::new))
202 .thenApply(v -> entrySet); 202 .thenApply(v -> entrySet);
203 } 203 }
204 204
...@@ -220,6 +220,11 @@ public class PartitionedDatabase implements Database { ...@@ -220,6 +220,11 @@ public class PartitionedDatabase implements Database {
220 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta); 220 return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
221 } 221 }
222 222
223 + @Override
224 + public CompletableFuture<Void> counterSet(String counterName, long value) {
225 + checkState(isOpen.get(), DB_NOT_OPEN);
226 + return partitioner.getPartition(counterName, counterName).counterSet(counterName, value);
227 + }
223 228
224 @Override 229 @Override
225 public CompletableFuture<Long> queueSize(String queueName) { 230 public CompletableFuture<Long> queueSize(String queueName) {
...@@ -268,8 +273,8 @@ public class PartitionedDatabase implements Database { ...@@ -268,8 +273,8 @@ public class PartitionedDatabase implements Database {
268 AtomicBoolean status = new AtomicBoolean(true); 273 AtomicBoolean status = new AtomicBoolean(true);
269 return CompletableFuture.allOf(subTransactions.entrySet() 274 return CompletableFuture.allOf(subTransactions.entrySet()
270 .stream() 275 .stream()
271 - .map(entry -> entry 276 + .map(entry -> entry
272 - .getKey() 277 + .getKey()
273 .prepare(entry.getValue()) 278 .prepare(entry.getValue())
274 .thenApply(v -> status.compareAndSet(true, v))) 279 .thenApply(v -> status.compareAndSet(true, v)))
275 .toArray(CompletableFuture[]::new)) 280 .toArray(CompletableFuture[]::new))
...@@ -282,15 +287,15 @@ public class PartitionedDatabase implements Database { ...@@ -282,15 +287,15 @@ public class PartitionedDatabase implements Database {
282 AtomicBoolean success = new AtomicBoolean(true); 287 AtomicBoolean success = new AtomicBoolean(true);
283 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList(); 288 List<UpdateResult<String, byte[]>> allUpdates = Lists.newArrayList();
284 return CompletableFuture.allOf(subTransactions.entrySet() 289 return CompletableFuture.allOf(subTransactions.entrySet()
285 - .stream() 290 + .stream()
286 - .map(entry -> entry.getKey().commit(entry.getValue()) 291 + .map(entry -> entry.getKey().commit(entry.getValue())
287 - .thenAccept(response -> { 292 + .thenAccept(response -> {
288 - success.set(success.get() && response.success()); 293 + success.set(success.get() && response.success());
289 - if (success.get()) { 294 + if (success.get()) {
290 - allUpdates.addAll(response.updates()); 295 + allUpdates.addAll(response.updates());
291 - } 296 + }
292 - })) 297 + }))
293 - .toArray(CompletableFuture[]::new)) 298 + .toArray(CompletableFuture[]::new))
294 .thenApply(v -> success.get() ? 299 .thenApply(v -> success.get() ?
295 CommitResponse.success(allUpdates) : CommitResponse.failure()); 300 CommitResponse.success(allUpdates) : CommitResponse.failure());
296 } 301 }
...@@ -301,7 +306,7 @@ public class PartitionedDatabase implements Database { ...@@ -301,7 +306,7 @@ public class PartitionedDatabase implements Database {
301 return CompletableFuture.allOf(subTransactions.entrySet() 306 return CompletableFuture.allOf(subTransactions.entrySet()
302 .stream() 307 .stream()
303 .map(entry -> entry.getKey().rollback(entry.getValue())) 308 .map(entry -> entry.getKey().rollback(entry.getValue()))
304 - .toArray(CompletableFuture[]::new)) 309 + .toArray(CompletableFuture[]::new))
305 .thenApply(v -> true); 310 .thenApply(v -> true);
306 } 311 }
307 312
...@@ -384,3 +389,4 @@ public class PartitionedDatabase implements Database { ...@@ -384,3 +389,4 @@ public class PartitionedDatabase implements Database {
384 partitions.forEach(p -> p.unregisterConsumer(consumer)); 389 partitions.forEach(p -> p.unregisterConsumer(consumer));
385 } 390 }
386 } 391 }
392 +
......