Madan Jampani
Committed by Gerrit Code Review

CachingAsyncConsistentMap: When changes are detected update cache with new value

Change-Id: I51307a8bff953389feeb8928f591151058d49eab
......@@ -51,7 +51,7 @@ public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMa
private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache;
private final MapEventListener<K, V> cacheInvalidator;
private final MapEventListener<K, V> cacheUpdater;
private final Consumer<Status> statusListener;
/**
......@@ -74,7 +74,14 @@ public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMa
cache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
.build(CacheLoader.from(CachingAsyncConsistentMap.super::get));
cacheInvalidator = event -> cache.invalidate(event.key());
cacheUpdater = event -> {
Versioned<V> newValue = event.newValue();
if (newValue == null) {
cache.invalidate(event.key());
} else {
cache.put(event.key(), CompletableFuture.completedFuture(newValue));
}
};
statusListener = status -> {
log.debug("{} status changed to {}", this.name(), status);
// If the status of the underlying map is SUSPENDED or INACTIVE
......@@ -83,14 +90,14 @@ public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMa
cache.invalidateAll();
}
};
super.addListener(cacheInvalidator);
super.addListener(cacheUpdater);
super.addStatusChangeListener(statusListener);
}
@Override
public CompletableFuture<Void> destroy() {
super.removeStatusChangeListener(statusListener);
return super.destroy().thenCompose(v -> removeListener(cacheInvalidator));
return super.destroy().thenCompose(v -> removeListener(cacheUpdater));
}
@Override
......