Madan Jampani

ONOS-4423: Support for invalidating cached map entries when a client session is suspended

Change-Id: Icb5e73dc7a37d9459d26cd3a5c9ca1e1a05b0436
......@@ -25,6 +25,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
......@@ -187,6 +188,21 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
}
@Override
public void addStatusChangeListener(Consumer<Status> listener) {
asyncMap.addStatusChangeListener(listener);
}
@Override
public void removeStatusChangeListener(Consumer<Status> listener) {
asyncMap.removeStatusChangeListener(listener);
}
@Override
public Collection<Consumer<Status>> statusChangeListeners() {
return asyncMap.statusChangeListeners();
}
@Override
public Map<K, V> asJavaMap() {
synchronized (this) {
if (javaMap == null) {
......
......@@ -15,7 +15,10 @@
*/
package org.onosproject.store.service;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.onosproject.core.ApplicationId;
......@@ -74,6 +77,29 @@ public interface DistributedPrimitive {
TRANSACTION_CONTEXT
}
/**
* Status of distributed primitive.
*/
public enum Status {
/**
* Signifies a state wherein the primitive is operating correctly and is capable of meeting the advertised
* consistency and reliability guarantees.
*/
ACTIVE,
/**
* Signifies a state wherein the primitive is temporarily incapable of providing the advertised
* consistency properties.
*/
SUSPENDED,
/**
* Signifies a state wherein the primitive has been shutdown and therefore cannot perform its functions.
*/
INACTIVE
}
static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 60000L;
/**
......@@ -107,4 +133,24 @@ public interface DistributedPrimitive {
default CompletableFuture<Void> destroy() {
return CompletableFuture.completedFuture(null);
}
/**
* Registers a listener to be called when the primitive's status changes.
* @param listener The listener to be called when the status changes.
*/
default void addStatusChangeListener(Consumer<Status> listener) {}
/**
* Unregisters a previously registered listener to be called when the primitive's status changes.
* @param listener The listener to unregister
*/
default void removeStatusChangeListener(Consumer<Status> listener) {}
/**
* Returns the collection of status change listeners previously registered.
* @return collection of status change listeners
*/
default Collection<Consumer<Status>> statusChangeListeners() {
return Collections.emptyList();
}
}
......
......@@ -15,18 +15,25 @@
*/
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE;
import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED;
/**
* {@code AsyncConsistentMap} that caches entries on read.
* <p>
......@@ -39,20 +46,13 @@ import com.google.common.cache.LoadingCache;
* @param <V> value type
*/
public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
private int maxCacheSize = 10000;
private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache =
CacheBuilder.newBuilder()
.maximumSize(maxCacheSize)
.build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() {
@Override
public CompletableFuture<Versioned<V>> load(K key)
throws Exception {
return CachingAsyncConsistentMap.super.get(key);
}
});
private static final int DEFAULT_CACHE_SIZE = 10000;
private final Logger log = getLogger(getClass());
private final MapEventListener<K, V> cacheInvalidator = event -> cache.invalidate(event.key());
private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache;
private final MapEventListener<K, V> cacheInvalidator;
private final Consumer<Status> statusListener;
/**
* Default constructor.
......@@ -60,24 +60,36 @@ public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMa
* @param backingMap a distributed, strongly consistent map for backing
*/
public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
super(backingMap);
super.addListener(cacheInvalidator);
this(backingMap, DEFAULT_CACHE_SIZE);
}
/**
* Constructor to configure cache size of LoadingCache.
* Constructor to configure cache size.
*
* @param backingMap a distributed, strongly consistent map for backing
* @param cacheSize the maximum size of the cache
*/
public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap, int cacheSize) {
super(backingMap);
cache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.build(CacheLoader.from(CachingAsyncConsistentMap.super::get));
cacheInvalidator = event -> cache.invalidate(event.key());
statusListener = status -> {
log.debug("{} status changed to {}", this.name(), status);
// If the status of the underlying map is SUSPENDED or INACTIVE
// we can no longer guarantee that the cache will be in sync.
if (status == SUSPENDED || status == INACTIVE) {
cache.invalidateAll();
}
};
super.addListener(cacheInvalidator);
maxCacheSize = cacheSize;
super.addStatusChangeListener(statusListener);
}
@Override
public CompletableFuture<Void> destroy() {
super.removeStatusChangeListener(statusListener);
return super.destroy().thenCompose(v -> removeListener(cacheInvalidator));
}
......
......@@ -24,6 +24,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.onosproject.core.ApplicationId;
......@@ -32,7 +33,6 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
/**
......@@ -183,6 +183,21 @@ public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K,
}
@Override
public void addStatusChangeListener(Consumer<Status> listener) {
delegateMap.addStatusChangeListener(listener);
}
@Override
public void removeStatusChangeListener(Consumer<Status> listener) {
delegateMap.removeStatusChangeListener(listener);
}
@Override
public Collection<Consumer<Status>> statusChangeListeners() {
return delegateMap.statusChangeListeners();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("delegateMap", delegateMap)
......
......@@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
......@@ -38,7 +39,6 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -254,6 +254,21 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
.thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
@Override
public void addStatusChangeListener(Consumer<Status> listener) {
partitions.values().forEach(map -> map.addStatusChangeListener(listener));
}
@Override
public void removeStatusChangeListener(Consumer<Status> listener) {
partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
}
@Override
public Collection<Consumer<Status>> statusChangeListeners() {
throw new UnsupportedOperationException();
}
/**
* Returns the map (partition) to which the specified key maps.
* @param key key
......
......@@ -23,6 +23,7 @@ import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.client.ConnectionStrategies;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.CopycatClient.State;
import io.atomix.copycat.client.RecoveryStrategies;
import io.atomix.copycat.client.RetryStrategies;
import io.atomix.copycat.client.ServerSelectionStrategies;
......@@ -36,6 +37,8 @@ import io.atomix.variables.DistributedLong;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
......@@ -48,6 +51,7 @@ import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
......@@ -71,6 +75,18 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
Serializer.using(KryoNamespaces.BASIC)));
Function<State, Status> mapper = state -> {
switch (state) {
case CONNECTED:
return Status.ACTIVE;
case SUSPENDED:
return Status.SUSPENDED;
case CLOSED:
return Status.INACTIVE;
default:
throw new IllegalStateException("Unknown state " + state);
}
};
public StoragePartitionClient(StoragePartition partition,
io.atomix.catalyst.serializer.Serializer serializer,
......@@ -90,7 +106,8 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
transport,
serializer.clone(),
StoragePartition.RESOURCE_TYPES);
copycatClient.onStateChange(state -> log.info("Client state {}", state));
copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+ " changed to {}", partition.getId(), state));
client = new AtomixClient(new ResourceClient(copycatClient));
}
return client.open().whenComplete((r, e) -> {
......@@ -109,9 +126,14 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
Consumer<State> statusListener = state -> {
atomixConsistentMap.statusChangeListeners()
.forEach(listener -> listener.accept(mapper.apply(state)));
};
copycatClient.onStateChange(statusListener);
AsyncConsistentMap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMap<String, byte[]>(client.getResource(name, AtomixConsistentMap.class)
.join()) {
new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@Override
public String name() {
return name;
......@@ -139,9 +161,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
return new DefaultAsyncAtomicValue<>(name,
serializer,
onosAtomicValuesMap.get());
return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
}
@Override
......
......@@ -22,6 +22,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
......@@ -33,7 +34,6 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
/**
......@@ -281,6 +281,21 @@ public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsi
}
}
@Override
public void addStatusChangeListener(Consumer<Status> listener) {
backingMap.addStatusChangeListener(listener);
}
@Override
public void removeStatusChangeListener(Consumer<Status> listener) {
backingMap.removeStatusChangeListener(listener);
}
@Override
public Collection<Consumer<Status>> statusChangeListeners() {
return backingMap.statusChangeListeners();
}
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
......
......@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.onlab.util.Match;
......@@ -53,7 +54,7 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/**
......@@ -63,6 +64,7 @@ import com.google.common.collect.Sets;
public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
public static final String CHANGE_SUBJECT = "changeEvents";
......@@ -291,4 +293,19 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
public void addStatusChangeListener(Consumer<Status> listener) {
statusChangeListeners.add(listener);
}
@Override
public void removeStatusChangeListener(Consumer<Status> listener) {
statusChangeListeners.remove(listener);
}
@Override
public Collection<Consumer<Status>> statusChangeListeners() {
return ImmutableSet.copyOf(statusChangeListeners);
}
}
\ No newline at end of file
......