Madan Jampani
Committed by Gerrit Code Review

New and direct StorageService method for creating an AtomicCounter

Change-Id: I8c189584dde590842075bea7e03c4c8ecf8d72c2
......@@ -39,10 +39,6 @@ public class CounterTestIncrementCommand extends AbstractShellCommand {
private final Logger log = getLogger(getClass());
@Option(name = "-i", aliases = "--inMemory", description = "use in memory map?",
required = false, multiValued = false)
private boolean inMemory = false;
@Option(name = "-g", aliases = "--getFirst", description = "get the counter's value before adding",
required = false, multiValued = false)
private boolean getFirst = false;
......@@ -63,16 +59,7 @@ public class CounterTestIncrementCommand extends AbstractShellCommand {
@Override
protected void execute() {
StorageService storageService = get(StorageService.class);
if (inMemory) {
atomicCounter = storageService.atomicCounterBuilder()
.withName(counter)
.withPartitionsDisabled()
.build();
} else {
atomicCounter = storageService.atomicCounterBuilder()
.withName(counter)
.build();
}
atomicCounter = storageService.getAsyncAtomicCounter(counter);
CompletableFuture<Long> result;
if (delta != null) {
if (getFirst) {
......
......@@ -95,9 +95,8 @@ public class DistributedConsensusLoadTest {
appId = coreService.registerApplication("org.onosproject.loadtest");
log.info("Started with {}", appId);
for (int i = 0; i < TOTAL_COUNTERS; ++i) {
AsyncAtomicCounter counter = storageService.atomicCounterBuilder()
.withName(String.format("onos-app-loadtest-counter-%d", i))
.build();
AsyncAtomicCounter counter =
storageService.getAsyncAtomicCounter(String.format("onos-app-loadtest-counter-%d", i));
counters.add(counter);
}
reporter.scheduleWithFixedDelay(() -> {
......
......@@ -87,4 +87,24 @@ public interface StorageService {
* @return a builder for a transaction context.
*/
TransactionContextBuilder transactionContextBuilder();
/**
* Returns an instance of {@code AsyncAtomicCounter} with specified name.
* @param name counter name
*
* @return AsyncAtomicCounter instance
*/
default AsyncAtomicCounter getAsyncAtomicCounter(String name) {
return atomicCounterBuilder().withName(name).build();
}
/**
* Returns an instance of {@code AtomicCounter} with specified name.
* @param name counter name
*
* @return AtomicCounter instance
*/
default AtomicCounter getAtomicCounter(String name) {
return getAsyncAtomicCounter(name).asAtomicCounter();
}
}
......
......@@ -67,10 +67,7 @@ public class ConsistentApplicationIdStore implements ApplicationIdStore {
@Activate
public void activate() {
appIdCounter = storageService.atomicCounterBuilder()
.withName("onos-app-id-counter")
.build()
.asAtomicCounter();
appIdCounter = storageService.getAtomicCounter("onos-app-id-counter");
registeredIds = storageService.<String, ApplicationId>consistentMapBuilder()
.withName("onos-app-ids")
......
......@@ -23,11 +23,9 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.core.IdBlock;
import org.onosproject.core.IdBlockStore;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
......@@ -42,9 +40,6 @@ import static org.slf4j.LoggerFactory.getLogger;
@Service
public class ConsistentIdBlockStore implements IdBlockStore {
private static final int MAX_TRIES = 5;
private static final int RETRY_DELAY_MS = 2_000;
private final Logger log = getLogger(getClass());
private final Map<String, AtomicCounter> topicCounters = Maps.newConcurrentMap();
......@@ -65,16 +60,7 @@ public class ConsistentIdBlockStore implements IdBlockStore {
@Override
public IdBlock getIdBlock(String topic) {
AtomicCounter counter = topicCounters
.computeIfAbsent(topic,
name -> storageService.atomicCounterBuilder()
.withName(name)
.build()
.asAtomicCounter());
Long blockBase = Tools.retryable(counter::getAndAdd,
StorageException.class,
MAX_TRIES,
RETRY_DELAY_MS).apply(DEFAULT_BLOCK_SIZE);
return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE);
AtomicCounter counter = topicCounters.computeIfAbsent(topic, storageService::getAtomicCounter);
return new IdBlock(counter.getAndAdd(DEFAULT_BLOCK_SIZE), DEFAULT_BLOCK_SIZE);
}
}
......
......@@ -50,11 +50,7 @@ public class LogicalClockManager implements LogicalClockService {
@Activate
public void activate() {
atomicCounter = storageService.atomicCounterBuilder()
.withName(SYSTEM_LOGICAL_CLOCK_COUNTER_NAME)
.withPartitionsDisabled()
.build()
.asAtomicCounter();
atomicCounter = storageService.getAtomicCounter(SYSTEM_LOGICAL_CLOCK_COUNTER_NAME);
log.info("Started");
}
......
......@@ -69,11 +69,7 @@ public class DistributedFlowObjectiveStore
.build()))
.build();
nextIds = storageService.atomicCounterBuilder()
.withName("next-objective-counter")
.build()
.asAtomicCounter();
nextIds = storageService.getAtomicCounter("next-objective-counter");
log.info("Started");
}
......
......@@ -132,10 +132,7 @@ public class Ofdpa2GroupHandler {
this.serviceDirectory = context.directory();
this.groupService = serviceDirectory.get(GroupService.class);
this.storageService = serviceDirectory.get(StorageService.class);
this.nextIndex = storageService.atomicCounterBuilder()
.withName("group-id-index-counter")
.build()
.asAtomicCounter();
this.nextIndex = storageService.getAtomicCounter("group-id-index-counter");
pendingNextObjectives = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
......
......@@ -182,10 +182,7 @@ public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, M
}
private AtomicCounter allocateCounter(DeviceId deviceId) {
return storageService.atomicCounterBuilder()
.withName(String.format(METERCOUNTERIDENTIFIER, deviceId))
.build()
.asAtomicCounter();
return storageService.getAtomicCounter(String.format(METERCOUNTERIDENTIFIER, deviceId));
}
private class InternalMeterProviderService
......