Flavio Castro
Committed by Gerrit Code Review

Added feature to count exceptions ocurred when operating distributed primitives

Change-Id: I24017e61cd2aefd1cf78aa5b241a3219e3e89b32
......@@ -51,34 +51,34 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
public CompletableFuture<Long> incrementAndGet() {
final MeteringAgent.Context timer = monitor.startTimer(INCREMENT_AND_GET);
return addAndGet(1L)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> get() {
final MeteringAgent.Context timer = monitor.startTimer(GET);
return database.counterGet(name)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> getAndIncrement() {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_INCREMENT);
return getAndAdd(1L)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> getAndAdd(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(GET_AND_ADD);
return database.counterGetAndAdd(name, delta)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Long> addAndGet(long delta) {
final MeteringAgent.Context timer = monitor.startTimer(ADD_AND_GET);
return database.counterAddAndGet(name, delta)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
}
\ No newline at end of file
}
......
......@@ -193,14 +193,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
public CompletableFuture<Integer> size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
return database.mapSize(name)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Boolean> isEmpty() {
final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
return database.mapIsEmpty(name)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -208,7 +208,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(key, ERROR_NULL_KEY);
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
return database.mapContainsKey(name, keyCache.getUnchecked(key))
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -216,7 +216,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(value, ERROR_NULL_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE);
return database.mapContainsValue(name, serializer.encode(value))
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -224,7 +224,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(key, ERROR_NULL_KEY);
final MeteringAgent.Context timer = monitor.startTimer(GET);
return database.mapGet(name, keyCache.getUnchecked(key))
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v != null ? v.map(serializer::decode) : null);
}
......@@ -235,7 +235,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(mappingFunction, "Mapping function cannot be null");
final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT);
return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v.newValue());
}
......@@ -279,7 +279,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> {
if (v.updated()) {
return v.newValue();
......@@ -296,7 +296,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(value, ERROR_NULL_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(PUT);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -305,7 +305,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(value, ERROR_NULL_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -313,7 +313,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(key, ERROR_NULL_KEY);
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -321,7 +321,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkIfUnmodifiable();
final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
return database.mapClear(name).thenApply(this::unwrapResult)
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
......@@ -332,14 +332,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
.stream()
.map(this::dK)
.collect(Collectors.toSet()))
.whenComplete((r, e) -> timer.stop());
.whenComplete((r, e) -> timer.stop(e));
}
@Override
public CompletableFuture<Collection<Versioned<V>>> values() {
final MeteringAgent.Context timer = monitor.startTimer(VALUES);
return database.mapValues(name)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(c -> c
.stream()
.map(v -> v.<V>map(serializer::decode))
......@@ -350,7 +350,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
return database.mapEntrySet(name)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(s -> s
.stream()
.map(this::mapRawEntry)
......@@ -363,7 +363,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(value, ERROR_NULL_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT);
return updateAndGet(key, Match.ifNull(), Match.any(), value)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v.oldValue());
}
......@@ -373,7 +373,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(value, ERROR_NULL_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return updateAndGet(key, Match.ifValue(value), Match.any(), null)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v.updated());
}
......@@ -382,7 +382,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(key, ERROR_NULL_KEY);
final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
return updateAndGet(key, Match.any(), Match.ifValue(version), null)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v.updated());
}
......@@ -393,7 +393,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
checkNotNull(newValue, ERROR_NULL_VALUE);
final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v.updated());
}
......@@ -401,7 +401,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenApply(v -> v.updated());
}
......
......@@ -72,7 +72,7 @@ public class DefaultAtomicValue<V> implements AtomicValue<V> {
return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
}
} finally {
newTimer.stop();
newTimer.stop(null);
}
}
......@@ -83,7 +83,7 @@ public class DefaultAtomicValue<V> implements AtomicValue<V> {
Versioned<byte[]> rawValue = valueMap.get(name);
return rawValue == null ? null : serializer.decode(rawValue.value());
} finally {
newTimer.stop();
newTimer.stop(null);
}
}
......@@ -95,7 +95,7 @@ public class DefaultAtomicValue<V> implements AtomicValue<V> {
valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
return previousValue == null ? null : serializer.decode(previousValue.value());
} finally {
newTimer.stop();
newTimer.stop(null);
}
}
......
......@@ -73,7 +73,7 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
@Override
public long size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop()));
return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop(e)));
}
@Override
......@@ -81,14 +81,14 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
checkNotNull(entry, ERROR_NULL_ENTRY);
final MeteringAgent.Context timer = monitor.startTimer(PUSH);
Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
.whenComplete((r, e) -> timer.stop()));
.whenComplete((r, e) -> timer.stop(e)));
}
@Override
public CompletableFuture<E> pop() {
final MeteringAgent.Context timer = monitor.startTimer(POP);
return database.queuePop(name)
.whenComplete((r, e) -> timer.stop())
.whenComplete((r, e) -> timer.stop(e))
.thenCompose(v -> {
if (v != null) {
return CompletableFuture.completedFuture(serializer.decode(v));
......@@ -97,6 +97,7 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
pendingFutures.add(newPendingFuture);
return newPendingFuture;
});
}
@Override
......@@ -104,7 +105,7 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
final MeteringAgent.Context timer = monitor.startTimer(PEEK);
return Futures.getUnchecked(database.queuePeek(name)
.thenApply(v -> v != null ? serializer.<E>decode(v) : null)
.whenComplete((r, e) -> timer.stop()));
.whenComplete((r, e) -> timer.stop(e)));
}
public String name() {
......
......@@ -67,7 +67,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.size();
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -77,7 +77,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.isEmpty();
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -88,7 +88,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.containsKey((E) o);
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -99,7 +99,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.keySet().iterator();
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -109,7 +109,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.keySet().stream().toArray();
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -119,7 +119,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.keySet().stream().toArray(size -> a);
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -129,7 +129,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.putIfAbsent(e, true) == null;
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -140,7 +140,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
return backingMap.remove((E) o) != null;
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -151,7 +151,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
return c.stream()
.allMatch(this::contains);
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -164,7 +164,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
.reduce(Boolean::logicalOr)
.orElse(false);
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -180,7 +180,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
.reduce(Boolean::logicalOr)
.orElse(false);
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -196,7 +196,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
.reduce(Boolean::logicalOr)
.orElse(false);
} finally {
timer.stop();
timer.stop(null);
}
}
......@@ -206,7 +206,7 @@ public class DefaultDistributedSet<E> implements DistributedSet<E> {
try {
backingMap.clear();
} finally {
timer.stop();
timer.stop(null);
}
}
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.store.consistent.impl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.collect.Maps;
import org.onlab.metrics.MetricsComponent;
......@@ -32,6 +33,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class MeteringAgent {
private Counter exceptionCounter;
private Counter perObjExceptionCounter;
private MetricsService metricsService;
private MetricsComponent metricsComponent;
private MetricsFeature metricsFeature;
......@@ -63,6 +66,8 @@ public class MeteringAgent {
this.wildcard = metricsComponent.registerFeature("*");
this.perObjTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
this.perPrimitiveTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
this.perObjExceptionCounter = metricsService.createCounter(metricsComponent, metricsFeature, "exceptions");
this.exceptionCounter = metricsService.createCounter(metricsComponent, wildcard, "exceptions");
}
}
......@@ -103,19 +108,25 @@ public class MeteringAgent {
/**
* Stops timer given a specific context and updates all related metrics.
* @param e
*/
public void stop() {
public void stop(Throwable e) {
if (!activated) {
return;
}
//Stop and updates timer with specific measurements per map, per operation
final long time = context.stop();
//updates timer with aggregated measurements per map
perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per map
perObjTimer.update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per all Consistent Maps
perPrimitiveTimer.update(time, TimeUnit.NANOSECONDS);
if (e == null) {
//Stop and updates timer with specific measurements per map, per operation
final long time = context.stop();
//updates timer with aggregated measurements per map
perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per map
perObjTimer.update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per all Consistent Maps
perPrimitiveTimer.update(time, TimeUnit.NANOSECONDS);
} else {
exceptionCounter.inc();
perObjExceptionCounter.inc();
}
}
}
......