Madan Jampani
Committed by Gerrit Code Review

Implementation of IdBlockStore on top of AtomicCounter

Change-Id: I019322a812262edeca20a55813183a63b4525b75
...@@ -30,6 +30,29 @@ public interface AsyncAtomicCounter { ...@@ -30,6 +30,29 @@ public interface AsyncAtomicCounter {
30 CompletableFuture<Long> incrementAndGet(); 30 CompletableFuture<Long> incrementAndGet();
31 31
32 /** 32 /**
33 + * Atomically increment by one the current value.
34 + *
35 + * @return previous value
36 + */
37 + CompletableFuture<Long> getAndIncrement();
38 +
39 + /**
40 + * Atomically adds the given value to the current value.
41 + *
42 + * @param delta the value to add
43 + * @return previous value
44 + */
45 + CompletableFuture<Long> getAndAdd(long delta);
46 +
47 + /**
48 + * Atomically adds the given value to the current value.
49 + *
50 + * @param delta the value to add
51 + * @return updated value
52 + */
53 + CompletableFuture<Long> addAndGet(long delta);
54 +
55 + /**
33 * Returns the current value of the counter without modifying it. 56 * Returns the current value of the counter without modifying it.
34 * 57 *
35 * @return current value 58 * @return current value
......
...@@ -28,6 +28,29 @@ public interface AtomicCounter { ...@@ -28,6 +28,29 @@ public interface AtomicCounter {
28 long incrementAndGet(); 28 long incrementAndGet();
29 29
30 /** 30 /**
31 + * Atomically increment by one the current value.
32 + *
33 + * @return previous value
34 + */
35 + long getAndIncrement();
36 +
37 + /**
38 + * Atomically adds the given value to the current value.
39 + *
40 + * @param delta the value to add
41 + * @return previous value
42 + */
43 + long getAndAdd(long delta);
44 +
45 + /**
46 + * Atomically adds the given value to the current value.
47 + *
48 + * @param delta the value to add
49 + * @return updated value
50 + */
51 + long addAndGet(long delta);
52 +
53 + /**
31 * Returns the current value of the counter without modifying it. 54 * Returns the current value of the counter without modifying it.
32 * 55 *
33 * @return current value 56 * @return current value
......
...@@ -190,21 +190,30 @@ public interface DatabaseProxy<K, V> { ...@@ -190,21 +190,30 @@ public interface DatabaseProxy<K, V> {
190 CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue); 190 CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
191 191
192 /** 192 /**
193 - * Returns the next value for the specified atomic counter after 193 + * Atomically add the given value to current value of the specified counter.
194 - * incrementing the current value by one.
195 * 194 *
196 * @param counterName counter name 195 * @param counterName counter name
197 - * @return next value for the specified counter 196 + * @param delta value to add
197 + * @return updated value
198 */ 198 */
199 - CompletableFuture<Long> nextValue(String counterName); 199 + CompletableFuture<Long> counterAddAndGet(String counterName, long delta);
200 200
201 /** 201 /**
202 - * Returns the current value for the specified atomic counter. 202 + * Atomically add the given value to current value of the specified counter.
203 * 203 *
204 * @param counterName counter name 204 * @param counterName counter name
205 - * @return current value for the specified counter 205 + * @param delta value to add
206 + * @return previous value
206 */ 207 */
207 - CompletableFuture<Long> currentValue(String counterName); 208 + CompletableFuture<Long> counterGetAndAdd(String counterName, long delta);
209 +
210 + /**
211 + * Returns the current value of the specified atomic counter.
212 + *
213 + * @param counterName counter name
214 + * @return current value
215 + */
216 + CompletableFuture<Long> counterGet(String counterName);
208 217
209 /** 218 /**
210 * Prepare and commit the specified transaction. 219 * Prepare and commit the specified transaction.
......
...@@ -98,10 +98,16 @@ public interface DatabaseState<K, V> { ...@@ -98,10 +98,16 @@ public interface DatabaseState<K, V> {
98 Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue); 98 Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
99 99
100 @Command 100 @Command
101 - Long nextValue(String counterName); 101 + Long counterIncrementAndGet(String counterName);
102 +
103 + @Command
104 + Long counterGetAndIncrement(String counterName);
105 +
106 + @Command
107 + Long counterGetAndAdd(String counterName, long delta);
102 108
103 @Query 109 @Query
104 - Long currentValue(String counterName); 110 + Long counterGet(String counterName);
105 111
106 @Command 112 @Command
107 boolean prepareAndCommit(Transaction transaction); 113 boolean prepareAndCommit(Transaction transaction);
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 import java.util.concurrent.CompletableFuture; 18 import java.util.concurrent.CompletableFuture;
19 +
19 import org.onosproject.store.service.AsyncAtomicCounter; 20 import org.onosproject.store.service.AsyncAtomicCounter;
21 +
20 import static com.google.common.base.Preconditions.*; 22 import static com.google.common.base.Preconditions.*;
21 23
22 /** 24 /**
...@@ -37,11 +39,26 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -37,11 +39,26 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
37 39
38 @Override 40 @Override
39 public CompletableFuture<Long> incrementAndGet() { 41 public CompletableFuture<Long> incrementAndGet() {
40 - return database.nextValue(name); 42 + return addAndGet(1L);
41 } 43 }
42 44
43 @Override 45 @Override
44 public CompletableFuture<Long> get() { 46 public CompletableFuture<Long> get() {
45 - return database.currentValue(name); 47 + return database.counterGet(name);
48 + }
49 +
50 + @Override
51 + public CompletableFuture<Long> getAndIncrement() {
52 + return getAndAdd(1L);
53 + }
54 +
55 + @Override
56 + public CompletableFuture<Long> getAndAdd(long delta) {
57 + return database.counterGetAndAdd(name, delta);
58 + }
59 +
60 + @Override
61 + public CompletableFuture<Long> addAndGet(long delta) {
62 + return database.counterAddAndGet(name, delta);
46 } 63 }
47 } 64 }
......
...@@ -46,6 +46,21 @@ public class DefaultAtomicCounter implements AtomicCounter { ...@@ -46,6 +46,21 @@ public class DefaultAtomicCounter implements AtomicCounter {
46 } 46 }
47 47
48 @Override 48 @Override
49 + public long getAndIncrement() {
50 + return complete(asyncCounter.getAndIncrement());
51 + }
52 +
53 + @Override
54 + public long getAndAdd(long delta) {
55 + return complete(asyncCounter.getAndAdd(delta));
56 + }
57 +
58 + @Override
59 + public long addAndGet(long delta) {
60 + return complete(asyncCounter.getAndAdd(delta));
61 + }
62 +
63 + @Override
49 public long get() { 64 public long get() {
50 return complete(asyncCounter.get()); 65 return complete(asyncCounter.get());
51 } 66 }
......
...@@ -150,13 +150,18 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -150,13 +150,18 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
150 } 150 }
151 151
152 @Override 152 @Override
153 - public CompletableFuture<Long> nextValue(String counterName) { 153 + public CompletableFuture<Long> counterGet(String counterName) {
154 - return checkOpen(() -> proxy.nextValue(counterName)); 154 + return checkOpen(() -> proxy.counterGet(counterName));
155 } 155 }
156 156
157 @Override 157 @Override
158 - public CompletableFuture<Long> currentValue(String counterName) { 158 + public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
159 - return checkOpen(() -> proxy.currentValue(counterName)); 159 + return checkOpen(() -> proxy.counterAddAndGet(counterName, delta));
160 + }
161 +
162 + @Override
163 + public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
164 + return checkOpen(() -> proxy.counterGetAndAdd(counterName, delta));
160 } 165 }
161 166
162 @Override 167 @Override
......
...@@ -225,12 +225,22 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -225,12 +225,22 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
225 } 225 }
226 226
227 @Override 227 @Override
228 - public Long nextValue(String counterName) { 228 + public Long counterIncrementAndGet(String counterName) {
229 return getCounter(counterName).incrementAndGet(); 229 return getCounter(counterName).incrementAndGet();
230 } 230 }
231 231
232 @Override 232 @Override
233 - public Long currentValue(String counterName) { 233 + public Long counterGetAndIncrement(String counterName) {
234 + return getCounter(counterName).getAndIncrement();
235 + }
236 +
237 + @Override
238 + public Long counterGetAndAdd(String counterName, long delta) {
239 + return getCounter(counterName).getAndAdd(delta);
240 + }
241 +
242 + @Override
243 + public Long counterGet(String counterName) {
234 return getCounter(counterName).get(); 244 return getCounter(counterName).get();
235 } 245 }
236 246
......
...@@ -235,15 +235,21 @@ public class PartitionedDatabase implements Database { ...@@ -235,15 +235,21 @@ public class PartitionedDatabase implements Database {
235 } 235 }
236 236
237 @Override 237 @Override
238 - public CompletableFuture<Long> nextValue(String counterName) { 238 + public CompletableFuture<Long> counterGet(String counterName) {
239 checkState(isOpen.get(), DB_NOT_OPEN); 239 checkState(isOpen.get(), DB_NOT_OPEN);
240 - return partitioner.getPartition(counterName, counterName).nextValue(counterName); 240 + return partitioner.getPartition(counterName, counterName).counterGet(counterName);
241 } 241 }
242 242
243 @Override 243 @Override
244 - public CompletableFuture<Long> currentValue(String counterName) { 244 + public CompletableFuture<Long> counterAddAndGet(String counterName, long delta) {
245 checkState(isOpen.get(), DB_NOT_OPEN); 245 checkState(isOpen.get(), DB_NOT_OPEN);
246 - return partitioner.getPartition(counterName, counterName).currentValue(counterName); 246 + return partitioner.getPartition(counterName, counterName).counterAddAndGet(counterName, delta);
247 + }
248 +
249 + @Override
250 + public CompletableFuture<Long> counterGetAndAdd(String counterName, long delta) {
251 + checkState(isOpen.get(), DB_NOT_OPEN);
252 + return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
247 } 253 }
248 254
249 @Override 255 @Override
......
1 +package org.onosproject.store.core.impl;
2 +
3 +import static org.slf4j.LoggerFactory.getLogger;
4 +
5 +import java.util.Map;
6 +
7 +import org.apache.felix.scr.annotations.Activate;
8 +import org.apache.felix.scr.annotations.Component;
9 +import org.apache.felix.scr.annotations.Deactivate;
10 +import org.apache.felix.scr.annotations.Reference;
11 +import org.apache.felix.scr.annotations.ReferenceCardinality;
12 +import org.apache.felix.scr.annotations.Service;
13 +import org.onosproject.core.IdBlock;
14 +import org.onosproject.core.IdBlockStore;
15 +import org.onosproject.store.service.AtomicCounter;
16 +import org.onosproject.store.service.StorageService;
17 +import org.slf4j.Logger;
18 +
19 +import com.google.common.collect.Maps;
20 +
21 +/**
22 + * Implementation of {@code IdBlockStore} using {@code AtomicCounter}.
23 + */
24 +@Component(immediate = true, enabled = true)
25 +@Service
26 +public class ConsistentIdBlockStore implements IdBlockStore {
27 +
28 + private final Logger log = getLogger(getClass());
29 + private final Map<String, AtomicCounter> topicCounters = Maps.newConcurrentMap();
30 +
31 + private static final long DEFAULT_BLOCK_SIZE = 0x100000L;
32 +
33 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
34 + protected StorageService storageService;
35 +
36 + @Activate
37 + public void activate() {
38 + log.info("Started");
39 + }
40 +
41 + @Deactivate
42 + public void deactivate() {
43 + log.info("Stopped");
44 + }
45 +
46 + @Override
47 + public IdBlock getIdBlock(String topic) {
48 + AtomicCounter counter = topicCounters.computeIfAbsent(topic,
49 + name -> storageService.atomicCounterBuilder()
50 + .withName(name)
51 + .build());
52 + Long blockBase = counter.getAndAdd(DEFAULT_BLOCK_SIZE);
53 + return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE);
54 + }
55 +}
...@@ -31,7 +31,7 @@ import java.util.Map; ...@@ -31,7 +31,7 @@ import java.util.Map;
31 /** 31 /**
32 * Distributed implementation of id block store using Hazelcast. 32 * Distributed implementation of id block store using Hazelcast.
33 */ 33 */
34 -@Component(immediate = true) 34 +@Component(immediate = false, enabled = false)
35 @Service 35 @Service
36 public class DistributedIdBlockStore implements IdBlockStore { 36 public class DistributedIdBlockStore implements IdBlockStore {
37 37
......