Flavio Castro
Committed by Gerrit Code Review

ONOS-2456 Added usage metrics to Atomic Counter and Distributed Queue plus refactored the code a bit

Refactored code plus instrumented AtomicValue and DistributedSet

Change-Id: I9c5f7c9f23d530131f15d3c98250ea33238dd2ec
Showing 18 changed files with 525 additions and 221 deletions
......@@ -59,6 +59,14 @@ public interface AtomicCounterBuilder {
AtomicCounterBuilder withRetryOnFailure();
* Instantiates Metering service to gather usage and performance metrics.
* By default, usage data will be stored.
* @return this AtomicCounterBuilder
AtomicCounterBuilder withMeteringDisabled();
* Sets the executor service to use for retrying failed operations.
* <p>
* Note: Must be set when retries are enabled
......@@ -60,6 +60,14 @@ public interface AtomicValueBuilder<V> {
AtomicValueBuilder<V> withPartitionsDisabled();
* Instantiates Metering service to gather usage and performance metrics.
* By default, usage data will be stored.
* @return this AtomicValueBuilder for method chaining
AtomicValueBuilder<V> withMeteringDisabled();
* Builds a AtomicValue based on the configuration options
* supplied to this builder.
......@@ -105,6 +105,14 @@ public interface ConsistentMapBuilder<K, V> {
ConsistentMapBuilder<K, V> withPurgeOnUninstall();
* Instantiates Metering service to gather usage and performance metrics.
* By default, usage data will be stored.
* @return this ConsistentMapBuilder
ConsistentMapBuilder<K, V> withMeteringDisabled();
* Builds an consistent map based on the configuration options
* supplied to this builder.
......@@ -51,6 +51,14 @@ public interface DistributedQueueBuilder<E> {
DistributedQueueBuilder<E> withSerializer(Serializer serializer);
* @return this DistributedQueueBuilder for method chaining
DistributedQueueBuilder<E> withMeteringDisabled();
* Disables persistence of queues entries.
* <p>
* When persistence is disabled, a full cluster restart will wipe out all
......@@ -92,6 +92,13 @@ public interface DistributedSetBuilder<E> {
DistributedSetBuilder<E> withPartitionsDisabled();
* Instantiate Metrics service to gather usage and performance metrics.
* By default usage information is enabled
* @return this DistributedSetBuilder
DistributedSetBuilder<E> withMeteringDisabled();
* Purges set contents when the application owning the set is uninstalled.
* <p>
* When this option is enabled, the caller must provide a applicationId via
......@@ -247,6 +247,11 @@ public final class TestConsistentMap<K, V> extends ConsistentMapAdapter<K, V> {
public ConsistentMapBuilder<K, V> withMeteringDisabled() {
return this;
public ConsistentMap<K, V> build() {
return new TestConsistentMap<>(mapName);
......@@ -15,20 +15,15 @@
package org.onosproject.store.consistent.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.slf4j.Logger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -46,66 +41,90 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
// TODO: configure delay via builder
private static final int DELAY_BETWEEN_RETRY_SEC = 1;
private final Logger log = getLogger(getClass());
private final MeteringAgent monitor;
private static final String PRIMITIVE_NAME = "atomicCounter";
private static final String INCREMENT_AND_GET = "incrementAndGet";
private static final String GET_AND_INCREMENT = "getAndIncrement";
private static final String GET_AND_ADD = "getAndAdd";
private static final String ADD_AND_GET = "addAndGet";
private static final String GET = "get";
public DefaultAsyncAtomicCounter(String name,
Database database,
boolean retryOnException,
ScheduledExecutorService retryExecutor) {
Database database,
boolean retryOnException,
boolean meteringEnabled,
ScheduledExecutorService retryExecutor) {
this.name = checkNotNull(name);
this.database = checkNotNull(database);
this.retryOnFailure = retryOnException;
this.retryExecutor = retryExecutor;
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
public CompletableFuture<Long> incrementAndGet() {
return addAndGet(1L);
final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
return addAndGet(1L)
.whenComplete((r, e) -> timer.stop());
public CompletableFuture<Long> get() {
return database.counterGet(name);
final MeteringAgent.Context timer = monitor.startTimer(GET);
return database.counterGet(name)
.whenComplete((r, e) -> timer.stop());
public CompletableFuture<Long> getAndIncrement() {
return getAndAdd(1L);
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT);
return getAndAdd(1L)
.whenComplete((r, e) -> timer.stop());
public CompletableFuture<Long> getAndAdd(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
CompletableFuture<Long> result = database.counterGetAndAdd(name, delta);
if (!retryOnFailure) {
return result;
return result
.whenComplete((r, e) -> timer.stop());
CompletableFuture<Long> future = new CompletableFuture<>();
return result.whenComplete((r, e) -> {
// TODO : Account for retries
if (e != null) {
log.warn("getAndAdd failed due to {}. Will retry", e.getMessage());
retryExecutor.schedule(new RetryTask(database::counterGetAndAdd, delta, future),
} else {
}).thenCompose(v -> future);
}).thenCompose(v -> future);
public CompletableFuture<Long> addAndGet(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
CompletableFuture<Long> result = database.counterAddAndGet(name, delta);
if (!retryOnFailure) {
return result;
return result
.whenComplete((r, e) -> timer.stop());
CompletableFuture<Long> future = new CompletableFuture<>();
return result.whenComplete((r, e) -> {
// TODO : Account for retries
if (e != null) {
log.warn("addAndGet failed due to {}. Will retry", e.getMessage());
retryExecutor.schedule(new RetryTask(database::counterAddAndGet, delta, future),
} else {
......@@ -16,47 +16,38 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.*;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.onlab.util.HexString;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.Set;
import com.codahale.metrics.Timer;
import org.onlab.metrics.MetricsComponent;
import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsService;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.util.HexString;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import static org.slf4j.LoggerFactory.getLogger;
* AsyncConsistentMap implementation that is backed by a Raft consensus
......@@ -65,7 +56,7 @@ import com.google.common.collect.Maps;
* @param <K> type of key.
* @param <V> type of value.
public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
private final String name;
private final ApplicationId applicationId;
......@@ -74,16 +65,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
private final boolean readOnly;
private final boolean purgeOnUninstall;
private final MetricsService metricsService;
private final MetricsComponent metricsComponent;
private final MetricsFeature metricsFeature;
private final Map<String, Timer> perMapOpTimers = Maps.newConcurrentMap();
private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
private final Timer cMapTimer;
private final Timer perMapTimer;
private final MetricsFeature wildcard;
private static final String COMPONENT_NAME = "consistentMap";
private static final String PRIMITIVE_NAME = "consistentMap";
private static final String SIZE = "size";
private static final String IS_EMPTY = "isEmpty";
private static final String CONTAINS_KEY = "containsKey";
......@@ -105,6 +87,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final Logger log = getLogger(getClass());
private final MeteringAgent monitor;
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
......@@ -124,11 +107,12 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public DefaultAsyncConsistentMap(String name,
ApplicationId applicationId,
Database database,
Serializer serializer,
boolean readOnly,
boolean purgeOnUninstall) {
ApplicationId applicationId,
Database database,
Serializer serializer,
boolean readOnly,
boolean purgeOnUninstall,
boolean meteringEnabled) {
this.name = checkNotNull(name, "map name cannot be null");
this.applicationId = applicationId;
this.database = checkNotNull(database, "database cannot be null");
......@@ -146,13 +130,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
this.metricsComponent = metricsService.registerComponent(COMPONENT_NAME);
this.metricsFeature = metricsComponent.registerFeature(name);
this.wildcard = metricsComponent.registerFeature("*");
this.perMapTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
this.cMapTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
......@@ -190,14 +168,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Integer> size() {
final OperationTimer timer = startTimer(SIZE);
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
return database.mapSize(name)
.whenComplete((r, e) -> timer.stop());
public CompletableFuture<Boolean> isEmpty() {
final OperationTimer timer = startTimer(IS_EMPTY);
final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
return database.mapIsEmpty(name)
.whenComplete((r, e) -> timer.stop());
......@@ -205,7 +183,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Boolean> containsKey(K key) {
checkNotNull(key, ERROR_NULL_KEY);
final OperationTimer timer = startTimer(CONTAINS_KEY);
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
return database.mapContainsKey(name, keyCache.getUnchecked(key))
.whenComplete((r, e) -> timer.stop());
......@@ -213,7 +191,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Boolean> containsValue(V value) {
checkNotNull(value, ERROR_NULL_VALUE);
final OperationTimer timer = startTimer(CONTAINS_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE);
return database.mapContainsValue(name, serializer.encode(value))
.whenComplete((r, e) -> timer.stop());
......@@ -221,18 +199,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
final OperationTimer timer = startTimer(GET);
final MeteringAgent.Context timer = monitor.startTimer(GET);
return database.mapGet(name, keyCache.getUnchecked(key))
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v != null ? v.map(serializer::decode) : null);
.thenApply(v -> v != null ? v.map(serializer::decode) : null);
public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
Function<? super K, ? extends V> mappingFunction) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(mappingFunction, "Mapping function cannot be null");
final OperationTimer timer = startTimer(COMPUTE_IF_ABSENT);
final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT);
return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.newValue());
......@@ -240,24 +218,24 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> computeIfPresent(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return computeIf(key, Objects::nonNull, remappingFunction);
public CompletableFuture<Versioned<V>> compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return computeIf(key, v -> true, remappingFunction);
public CompletableFuture<Versioned<V>> computeIf(K key,
Predicate<? super V> condition,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
Predicate<? super V> condition,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(condition, "predicate function cannot be null");
checkNotNull(remappingFunction, "Remapping function cannot be null");
final OperationTimer timer = startTimer(COMPUTE_IF);
final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF);
return get(key).thenCompose(r1 -> {
V existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
......@@ -293,7 +271,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
final OperationTimer timer = startTimer(PUT);
final MeteringAgent.Context timer = monitor.startTimer(PUT);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
.whenComplete((r, e) -> timer.stop());
......@@ -302,7 +280,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
final OperationTimer timer = startTimer(PUT_AND_GET);
final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
.whenComplete((r, e) -> timer.stop());
......@@ -310,7 +288,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
final OperationTimer timer = startTimer(REMOVE);
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
.whenComplete((r, e) -> timer.stop());
......@@ -318,14 +296,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Void> clear() {
final OperationTimer timer = startTimer(CLEAR);
final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
return database.mapClear(name).thenApply(this::unwrapResult)
.whenComplete((r, e) -> timer.stop());
public CompletableFuture<Set<K>> keySet() {
final OperationTimer timer = startTimer(KEY_SET);
final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
return database.mapKeySet(name)
.thenApply(s -> s
......@@ -336,7 +314,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Collection<Versioned<V>>> values() {
final OperationTimer timer = startTimer(VALUES);
final MeteringAgent.Context timer = monitor.startTimer(VALUES);
return database.mapValues(name)
.whenComplete((r, e) -> timer.stop())
.thenApply(c -> c
......@@ -347,7 +325,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
final OperationTimer timer = startTimer(ENTRY_SET);
final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
return database.mapEntrySet(name)
.whenComplete((r, e) -> timer.stop())
.thenApply(s -> s
......@@ -360,7 +338,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
final OperationTimer timer = startTimer(PUT_IF_ABSENT);
final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT);
return updateAndGet(key, Match.ifNull(), Match.any(), value)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.oldValue());
......@@ -370,7 +348,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
final OperationTimer timer = startTimer(REMOVE);
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return updateAndGet(key, Match.ifValue(value), Match.any(), null)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
......@@ -379,7 +357,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
final OperationTimer timer = startTimer(REMOVE);
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return updateAndGet(key, Match.any(), Match.ifValue(version), null)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
......@@ -390,7 +368,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(oldValue, ERROR_NULL_VALUE);
checkNotNull(newValue, ERROR_NULL_VALUE);
final OperationTimer timer = startTimer(REPLACE);
final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
......@@ -398,7 +376,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
final OperationTimer timer = startTimer(REPLACE);
final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
......@@ -409,9 +387,9 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
Match<V> oldValueMatch,
Match<Long> oldVersionMatch,
V value) {
Match<V> oldValueMatch,
Match<Long> oldVersionMatch,
V value) {
return database.mapUpdate(name,
......@@ -461,33 +439,4 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
private OperationTimer startTimer(String op) {
//check if timer exist, if it doesn't creates it
final Timer currTimer = perMapOpTimers.computeIfAbsent(op, timer ->
metricsService.createTimer(metricsComponent, metricsFeature, op));
perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
//starts timer
return new OperationTimer(currTimer.time(), op);
private class OperationTimer {
private final Timer.Context context;
private final String operation;
public OperationTimer(Timer.Context context, String operation) {
this.context = context;
this.operation = operation;
public void stop() {
//Stop and updates timer with specific measurements per map, per operation
final long time = context.stop();
//updates timer with aggregated measurements per map
perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per map
perMapTimer.update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per all Consistent Maps
cMapTimer.update(time, TimeUnit.NANOSECONDS);
\ No newline at end of file
......@@ -15,16 +15,16 @@
package org.onosproject.store.consistent.impl;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.StorageException;
* Default implementation for a distributed AtomicCounter backed by
* partitioned Raft DB.
......@@ -38,10 +38,11 @@ public class DefaultAtomicCounter implements AtomicCounter {
private final AsyncAtomicCounter asyncCounter;
public DefaultAtomicCounter(String name,
Database database,
boolean retryOnException,
ScheduledExecutorService retryExecutor) {
asyncCounter = new DefaultAsyncAtomicCounter(name, database, retryOnException, retryExecutor);
Database database,
boolean retryOnException,
boolean meteringEnabled,
ScheduledExecutorService retryExecutor) {
asyncCounter = new DefaultAsyncAtomicCounter(name, database, retryOnException, meteringEnabled, retryExecutor);
......@@ -15,12 +15,12 @@
package org.onosproject.store.consistent.impl;
import java.util.concurrent.ScheduledExecutorService;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.AtomicCounterBuilder;
import java.util.concurrent.ScheduledExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
......@@ -33,6 +33,7 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
private final Database partitionedDatabase;
private final Database inMemoryDatabase;
private boolean retryOnFailure = false;
private boolean metering = true;
private ScheduledExecutorService retryExecutor = null;
public DefaultAtomicCounterBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
......@@ -57,14 +58,14 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
public AtomicCounter build() {
Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
return new DefaultAtomicCounter(name, database, retryOnFailure, retryExecutor);
return new DefaultAtomicCounter(name, database, retryOnFailure, metering, retryExecutor);
public AsyncAtomicCounter buildAsyncCounter() {
Database database = partitionsEnabled ? partitionedDatabase : inMemoryDatabase;
return new DefaultAsyncAtomicCounter(name, database, retryOnFailure, retryExecutor);
return new DefaultAsyncAtomicCounter(name, database, retryOnFailure, metering, retryExecutor);
......@@ -74,6 +75,12 @@ public class DefaultAtomicCounterBuilder implements AtomicCounterBuilder {
public AtomicCounterBuilder withMeteringDisabled() {
metering = false;
return this;
public AtomicCounterBuilder withRetryExecutor(ScheduledExecutorService executor) {
this.retryExecutor = executor;
return this;
......@@ -15,9 +15,6 @@
package org.onosproject.store.consistent.impl;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
......@@ -27,6 +24,9 @@ import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
* Default implementation of AtomicValue.
......@@ -39,41 +39,64 @@ public class DefaultAtomicValue<V> implements AtomicValue<V> {
private final String name;
private final Serializer serializer;
private final MapEventListener<String, byte[]> mapEventListener = new InternalMapEventListener();
private final MeteringAgent monitor;
private static final String COMPONENT_NAME = "atomicValue";
private static final String GET = "get";
private static final String GET_AND_SET = "getAndSet";
private static final String COMPARE_AND_SET = "compareAndSet";
public DefaultAtomicValue(ConsistentMap<String, byte[]> valueMap,
String name,
Serializer serializer) {
String name,
boolean meteringEnabled,
Serializer serializer) {
this.valueMap = valueMap;
this.name = name;
this.serializer = serializer;
this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
public boolean compareAndSet(V expect, V update) {
if (expect == null) {
if (update == null) {
return true;
return valueMap.putIfAbsent(name, serializer.encode(update)) == null;
} else {
if (update == null) {
return valueMap.remove(name, serializer.encode(expect));
final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
try {
if (expect == null) {
if (update == null) {
return true;
return valueMap.putIfAbsent(name, serializer.encode(update)) == null;
} else {
if (update == null) {
return valueMap.remove(name, serializer.encode(expect));
return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
} finally {
public V get() {
Versioned<byte[]> rawValue = valueMap.get(name);
return rawValue == null ? null : serializer.decode(rawValue.value());
final MeteringAgent.Context newTimer = monitor.startTimer(GET);
try {
Versioned<byte[]> rawValue = valueMap.get(name);
return rawValue == null ? null : serializer.decode(rawValue.value());
} finally {
public V getAndSet(V value) {
Versioned<byte[]> previousValue = value == null ?
valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
return previousValue == null ? null : serializer.decode(previousValue.value());
final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
try {
Versioned<byte[]> previousValue = value == null ?
valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
return previousValue == null ? null : serializer.decode(previousValue.value());
} finally {
......@@ -31,10 +31,12 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
private Serializer serializer;
private String name;
private ConsistentMapBuilder<String, byte[]> mapBuilder;
private boolean metering = true;
public DefaultAtomicValueBuilder(DatabaseManager manager) {
mapBuilder = manager.<String, byte[]>consistentMapBuilder()
......@@ -57,7 +59,13 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
public AtomicValueBuilder<V> withMeteringDisabled() {
metering = false;
return this;
public AtomicValue<V> build() {
return new DefaultAtomicValue<>(mapBuilder.build(), name, serializer);
return new DefaultAtomicValue<>(mapBuilder.build(), name, metering, serializer);
\ No newline at end of file
......@@ -15,15 +15,15 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
* Default Consistent Map builder.
......@@ -38,6 +38,7 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
private boolean purgeOnUninstall = false;
private boolean partitionsEnabled = true;
private boolean readOnly = false;
private boolean metering = true;
private final DatabaseManager manager;
public DefaultConsistentMapBuilder(DatabaseManager manager) {
......@@ -65,6 +66,12 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
public ConsistentMapBuilder<K, V> withMeteringDisabled() {
metering = false;
return this;
public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
......@@ -109,7 +116,8 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
return manager.registerMap(asyncMap);
\ No newline at end of file
......@@ -15,25 +15,24 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static com.google.common.base.Preconditions.checkNotNull;
* DistributedQueue implementation that provides FIFO ordering semantics.
* @param <E> queue entry type
public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
private final String name;
private final Database database;
......@@ -42,35 +41,56 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
private final Consumer<Set<NodeId>> notifyConsumers;
private static final String PRIMITIVE_NAME = "distributedQueue";
private static final String SIZE = "size";
private static final String PUSH = "push";
private static final String POP = "pop";
private static final String PEEK = "peek";
private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
private final MeteringAgent monitor;
public DefaultDistributedQueue(String name,
Database database,
Serializer serializer,
NodeId localNodeId,
Consumer<Set<NodeId>> notifyConsumers) {
Database database,
Serializer serializer,
NodeId localNodeId,
boolean meteringEnabled,
Consumer<Set<NodeId>> notifyConsumers) {
this.name = checkNotNull(name, "queue name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.localNodeId = localNodeId;
this.notifyConsumers = notifyConsumers;
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
public long size() {
return Futures.getUnchecked(database.queueSize(name));
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
try {
return Futures.getUnchecked(database.queueSize(name));
} finally {
public void push(E entry) {
checkNotNull(entry, ERROR_NULL_ENTRY);
Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
.thenApply(v -> null));
final MeteringAgent.Context timer = monitor.startTimer(PUSH);
try {
checkNotNull(entry, ERROR_NULL_ENTRY);
Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
.thenApply(v -> null));
} finally {
public CompletableFuture<E> pop() {
final MeteringAgent.Context timer = monitor.startTimer(POP);
return database.queuePop(name, localNodeId)
.thenCompose(v -> {
if (v != null) {
......@@ -80,13 +100,19 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
return newPendingFuture;
.whenComplete((r, e) -> timer.stop());
public E peek() {
return Futures.getUnchecked(database.queuePeek(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
final MeteringAgent.Context timer = monitor.startTimer(PEEK);
try {
return Futures.getUnchecked(database.queuePeek(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
} finally {
public String name() {
......@@ -15,18 +15,17 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.util.Set;
import java.util.function.Consumer;
import com.google.common.base.Charsets;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.Serializer;
import com.google.common.base.Charsets;
import java.util.Set;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
* Default implementation of a {@code DistributedQueueBuilder}.
......@@ -39,6 +38,7 @@ public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilde
private String name;
private boolean persistenceEnabled = true;
private final DatabaseManager databaseManager;
private boolean metering = true;
public DefaultDistributedQueueBuilder(
DatabaseManager databaseManager) {
......@@ -60,6 +60,12 @@ public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilde
public DistributedQueueBuilder<E> withMeteringDisabled() {
metering = false;
return this;
public DistributedQueueBuilder<E> withPersistenceDisabled() {
persistenceEnabled = false;
return this;
......@@ -81,6 +87,7 @@ public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilde
persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
return queue;
......@@ -15,11 +15,8 @@
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.MapEvent;
......@@ -27,8 +24,10 @@ import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
* Implementation of distributed set that is backed by a ConsistentMap.
......@@ -37,96 +36,178 @@ import com.google.common.collect.Sets;
public class DefaultDistributedSet<E> implements DistributedSet<E> {
private static final String CONTAINS = "contains";
private static final String PRIMITIVE_NAME = "distributedSet";
private static final String SIZE = "size";
private static final String IS_EMPTY = "isEmpty";
private static final String ITERATOR = "iterator";
private static final String TO_ARRAY = "toArray";
private static final String ADD = "add";
private static final String REMOVE = "remove";
private static final String CONTAINS_ALL = "containsAll";
private static final String ADD_ALL = "addAll";
private static final String RETAIN_ALL = "retainAll";
private static final String REMOVE_ALL = "removeAll";
private static final String CLEAR = "clear";
private final String name;
private final ConsistentMap<E, Boolean> backingMap;
private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
private final MeteringAgent monitor;
public DefaultDistributedSet(String name, ConsistentMap<E, Boolean> backingMap) {
public DefaultDistributedSet(String name, boolean meteringEnabled, ConsistentMap<E, Boolean> backingMap) {
this.name = name;
this.backingMap = backingMap;
monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
public int size() {
return backingMap.size();
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
try {
return backingMap.size();
} finally {
public boolean isEmpty() {
return backingMap.isEmpty();
final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
try {
return backingMap.isEmpty();
} finally {
public boolean contains(Object o) {
return backingMap.containsKey((E) o);
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS);
try {
return backingMap.containsKey((E) o);
} finally {
public Iterator<E> iterator() {
return backingMap.keySet().iterator();
final MeteringAgent.Context timer = monitor.startTimer(ITERATOR);
//Do we have to measure this guy?
try {
return backingMap.keySet().iterator();
} finally {
public Object[] toArray() {
return backingMap.keySet().stream().toArray();
final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
try {
return backingMap.keySet().stream().toArray();
} finally {
public <T> T[] toArray(T[] a) {
return backingMap.keySet().stream().toArray(size -> a);
final MeteringAgent.Context timer = monitor.startTimer(TO_ARRAY);
try {
return backingMap.keySet().stream().toArray(size -> a);
} finally {
public boolean add(E e) {
return backingMap.putIfAbsent(e, true) == null;
final MeteringAgent.Context timer = monitor.startTimer(ADD);
try {
return backingMap.putIfAbsent(e, true) == null;
} finally {
public boolean remove(Object o) {
return backingMap.remove((E) o) != null;
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
try {
return backingMap.remove((E) o) != null;
} finally {
public boolean containsAll(Collection<?> c) {
return c.stream()
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_ALL);
try {
return c.stream()
} finally {
public boolean addAll(Collection<? extends E> c) {
return c.stream()
final MeteringAgent.Context timer = monitor.startTimer(ADD_ALL);
try {
return c.stream()
} finally {
public boolean retainAll(Collection<?> c) {
Set<?> retainSet = Sets.newHashSet(c);
return backingMap.keySet()
final MeteringAgent.Context timer = monitor.startTimer(RETAIN_ALL);
try {
Set<?> retainSet = Sets.newHashSet(c);
return backingMap.keySet()
.filter(k -> !retainSet.contains(k))
} finally {
public boolean removeAll(Collection<?> c) {
Set<?> removeSet = Sets.newHashSet(c);
return backingMap.keySet()
final MeteringAgent.Context timer = monitor.startTimer(REMOVE_ALL);
try {
Set<?> removeSet = Sets.newHashSet(c);
return backingMap.keySet()
} finally {
public void clear() {
final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
try {
} finally {
......@@ -30,9 +30,11 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E>
private String name;
private ConsistentMapBuilder<E, Boolean> mapBuilder;
private boolean metering = true;
public DefaultDistributedSetBuilder(DatabaseManager manager) {
this.mapBuilder = manager.consistentMapBuilder();
......@@ -73,7 +75,13 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E>
public DistributedSetBuilder<E> withMeteringDisabled() {
metering = false;
return this;
public DistributedSet<E> build() {
return new DefaultDistributedSet<E>(name, mapBuilder.build());
return new DefaultDistributedSet<E>(name, metering, mapBuilder.build());
package org.onosproject.store.consistent.impl;
* Copyright 2015 Open Networking Laboratory
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import com.codahale.metrics.Timer;
import com.google.common.collect.Maps;
import org.onlab.metrics.MetricsComponent;
import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsService;
import org.onlab.osgi.DefaultServiceDirectory;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
* Agent that implements usage and performance monitoring via the metrics service.
public class MeteringAgent {
private MetricsService metricsService;
private MetricsComponent metricsComponent;
private MetricsFeature metricsFeature;
private final Map<String, Timer> perObjOpTimers = Maps.newConcurrentMap();
private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
private Timer perPrimitiveTimer;
private Timer perObjTimer;
private MetricsFeature wildcard;
private final boolean activated;
private Context nullTimer;
* Constructs a new MeteringAgent for a given distributed primitive.
* Instantiates the metrics service
* Initializes all the general metrics for that object
* @param primitiveName Type of primitive to be metered
* @param objName Global name of the primitive
* @param activated
public MeteringAgent(String primitiveName, String objName, boolean activated) {
checkNotNull(objName, "Object name cannot be null");
this.activated = activated;
nullTimer = new Context(null, "");
if (this.activated) {
this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
this.metricsComponent = metricsService.registerComponent(primitiveName);
this.metricsFeature = metricsComponent.registerFeature(objName);
this.wildcard = metricsComponent.registerFeature("*");
this.perObjTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
this.perPrimitiveTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
* Initializes a specific timer for a given operation.
* @param op Specific operation being metered
public Context startTimer(String op) {
if (!activated) {
return nullTimer;
// Check if timer exists, if it doesn't creates it
final Timer currTimer = perObjOpTimers.computeIfAbsent(op, timer ->
metricsService.createTimer(metricsComponent, metricsFeature, op));
perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
// Starts timer
return new Context(currTimer.time(), op);
* Timer.Context with a specific operation.
public class Context {
private final Timer.Context context;
private final String operation;
* Constructs Context.
* @param context
* @param operation
public Context(Timer.Context context, String operation) {
this.context = context;
this.operation = operation;
* Stops timer given a specific context and updates all related metrics.
public void stop() {
if (!activated) {
//Stop and updates timer with specific measurements per map, per operation
final long time = context.stop();
//updates timer with aggregated measurements per map
perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per map
perObjTimer.update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per all Consistent Maps
perPrimitiveTimer.update(time, TimeUnit.NANOSECONDS);