Flavio Castro
Committed by Gerrit Code Review

ONOS-2315 Adding new metrics to ConsistentMaps

Change-Id: Iba9a70f5eb268834564be26e42776b9caa4ea547
......@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
......@@ -33,6 +34,11 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.Set;
import com.codahale.metrics.Timer;
import org.onlab.metrics.MetricsComponent;
import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsService;
import org.onlab.osgi.DefaultServiceDirectory;
import org.onlab.util.HexString;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
......@@ -70,6 +76,34 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
private final boolean purgeOnUninstall;
private final Consumer<MapEvent<K, V>> eventPublisher;
private final MetricsService metricsService;
private final MetricsComponent metricsComponent;
private final MetricsFeature metricsFeature;
private final Map<String, Timer> perMapOpTimers = Maps.newConcurrentMap();
private final Map<String, Timer> perOpTimers = Maps.newConcurrentMap();
private final Timer cMapTimer;
private final Timer perMapTimer;
private final MetricsFeature wildcard;
private static final String COMPONENT_NAME = "consistentMap";
private static final String SIZE = "size";
private static final String IS_EMPTY = "isEmpty";
private static final String CONTAINS_KEY = "containsKey";
private static final String CONTAINS_VALUE = "containsValue";
private static final String GET = "get";
private static final String COMPUTE_IF = "computeIf";
private static final String PUT = "put";
private static final String PUT_AND_GET = "putAndGet";
private static final String PUT_IF_ABSENT = "putIfAbsent";
private static final String REMOVE = "remove";
private static final String CLEAR = "clear";
private static final String KEY_SET = "keySet";
private static final String VALUES = "values";
private static final String ENTRY_SET = "entrySet";
private static final String REPLACE = "replace";
private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final Logger log = getLogger(getClass());
......@@ -116,6 +150,13 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
}
});
});
this.metricsService = DefaultServiceDirectory.getService(MetricsService.class);
this.metricsComponent = metricsService.registerComponent(COMPONENT_NAME);
this.metricsFeature = metricsComponent.registerFeature(name);
this.wildcard = metricsComponent.registerFeature("*");
this.perMapTimer = metricsService.createTimer(metricsComponent, metricsFeature, "*");
this.cMapTimer = metricsService.createTimer(metricsComponent, wildcard, "*");
}
/**
......@@ -153,31 +194,41 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
@Override
public CompletableFuture<Integer> size() {
return database.mapSize(name);
final OperationTimer timer = startTimer(SIZE);
return database.mapSize(name)
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Boolean> isEmpty() {
return database.mapIsEmpty(name);
final OperationTimer timer = startTimer(IS_EMPTY);
return database.mapIsEmpty(name)
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Boolean> containsKey(K key) {
checkNotNull(key, ERROR_NULL_KEY);
return database.mapContainsKey(name, keyCache.getUnchecked(key));
final OperationTimer timer = startTimer(CONTAINS_KEY);
return database.mapContainsKey(name, keyCache.getUnchecked(key))
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Boolean> containsValue(V value) {
checkNotNull(value, ERROR_NULL_VALUE);
return database.mapContainsValue(name, serializer.encode(value));
final OperationTimer timer = startTimer(CONTAINS_VALUE);
return database.mapContainsValue(name, serializer.encode(value))
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Versioned<V>> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
final OperationTimer timer = startTimer(GET);
return database.mapGet(name, keyCache.getUnchecked(key))
.thenApply(v -> v != null ? v.map(serializer::decode) : null);
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v != null ? v.map(serializer::decode) : null);
}
@Override
......@@ -185,7 +236,10 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
Function<? super K, ? extends V> mappingFunction) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(mappingFunction, "Mapping function cannot be null");
return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key)).thenApply(v -> v.newValue());
final OperationTimer timer = startTimer(COMPUTE_IF_ABSENT);
return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.newValue());
}
@Override
......@@ -207,6 +261,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(condition, "predicate function cannot be null");
checkNotNull(remappingFunction, "Remapping function cannot be null");
final OperationTimer timer = startTimer(COMPUTE_IF);
return get(key).thenCompose(r1 -> {
V existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
......@@ -227,6 +282,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> {
if (v.updated()) {
return v.newValue();
......@@ -241,71 +297,96 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue());
final OperationTimer timer = startTimer(PUT);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue());
final OperationTimer timer = startTimer(PUT_AND_GET);
return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue());
final OperationTimer timer = startTimer(REMOVE);
return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Void> clear() {
checkIfUnmodifiable();
return database.mapClear(name).thenApply(this::unwrapResult);
final OperationTimer timer = startTimer(CLEAR);
return database.mapClear(name).thenApply(this::unwrapResult)
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Set<K>> keySet() {
final OperationTimer timer = startTimer(KEY_SET);
return database.mapKeySet(name)
.thenApply(s -> s
.stream()
.map(this::dK)
.collect(Collectors.toSet()));
.stream()
.map(this::dK)
.collect(Collectors.toSet()))
.whenComplete((r, e) -> timer.stop());
}
@Override
public CompletableFuture<Collection<Versioned<V>>> values() {
return database.mapValues(name).thenApply(c -> c
.stream()
.map(v -> v.<V>map(serializer::decode))
.collect(Collectors.toList()));
final OperationTimer timer = startTimer(VALUES);
return database.mapValues(name)
.whenComplete((r, e) -> timer.stop())
.thenApply(c -> c
.stream()
.map(v -> v.<V>map(serializer::decode))
.collect(Collectors.toList()));
}
@Override
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
return database.mapEntrySet(name).thenApply(s -> s
.stream()
.map(this::mapRawEntry)
.collect(Collectors.toSet()));
final OperationTimer timer = startTimer(ENTRY_SET);
return database.mapEntrySet(name)
.whenComplete((r, e) -> timer.stop())
.thenApply(s -> s
.stream()
.map(this::mapRawEntry)
.collect(Collectors.toSet()));
}
@Override
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return updateAndGet(key, Match.ifNull(), Match.any(), value).thenApply(v -> v.oldValue());
final OperationTimer timer = startTimer(PUT_IF_ABSENT);
return updateAndGet(key, Match.ifNull(), Match.any(), value)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.oldValue());
}
@Override
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return updateAndGet(key, Match.ifValue(value), Match.any(), null).thenApply(v -> v.updated());
final OperationTimer timer = startTimer(REMOVE);
return updateAndGet(key, Match.ifValue(value), Match.any(), null)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
return updateAndGet(key, Match.any(), Match.ifValue(version), null).thenApply(v -> v.updated());
final OperationTimer timer = startTimer(REMOVE);
return updateAndGet(key, Match.any(), Match.ifValue(version), null)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
}
@Override
......@@ -313,12 +394,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(oldValue, ERROR_NULL_VALUE);
checkNotNull(newValue, ERROR_NULL_VALUE);
return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue).thenApply(v -> v.updated());
final OperationTimer timer = startTimer(REPLACE);
return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue).thenApply(v -> v.updated());
final OperationTimer timer = startTimer(REPLACE);
return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
.whenComplete((r, e) -> timer.stop())
.thenApply(v -> v.updated());
}
private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
......@@ -331,10 +418,10 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
V value) {
checkIfUnmodifiable();
return database.mapUpdate(name,
keyCache.getUnchecked(key),
oldValueMatch.map(serializer::encode),
oldVersionMatch,
value == null ? null : serializer.encode(value))
keyCache.getUnchecked(key),
oldValueMatch.map(serializer::encode),
oldVersionMatch,
value == null ? null : serializer.encode(value))
.thenApply(this::unwrapResult)
.thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
.whenComplete((r, e) -> {
......@@ -392,4 +479,34 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
eventPublisher.accept(event);
}
}
private OperationTimer startTimer(String op) {
//check if timer exist, if it doesn't creates it
final Timer currTimer = perMapOpTimers.computeIfAbsent(op, timer ->
metricsService.createTimer(metricsComponent, metricsFeature, op));
perOpTimers.computeIfAbsent(op, timer -> metricsService.createTimer(metricsComponent, wildcard, op));
//starts timer
return new OperationTimer(currTimer.time(), op);
}
private class OperationTimer {
private final Timer.Context context;
private final String operation;
public OperationTimer(Timer.Context context, String operation) {
this.context = context;
this.operation = operation;
}
public void stop() {
//Stop and updates timer with specific measurements per map, per operation
final long time = context.stop();
//updates timer with aggregated measurements per map
perOpTimers.get(operation).update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per map
perMapTimer.update(time, TimeUnit.NANOSECONDS);
//updates timer with aggregated measurements per all Consistent Maps
cMapTimer.update(time, TimeUnit.NANOSECONDS);
}
}
}
\ No newline at end of file
......