Madan Jampani
Committed by Gerrit Code Review

Added a map API to transform Versioned<byte[]> to Versioned<V>

Fix bug where ConsistentMap.{putIfAbsent,remove} do not publish MapEvents

Change-Id: Ib7a9e01cad2b9099e6872916ae392351b68299ef
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
16 16
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 +import java.util.function.Function;
20 +
19 import org.joda.time.DateTime; 21 import org.joda.time.DateTime;
20 22
21 import com.google.common.base.MoreObjects; 23 import com.google.common.base.MoreObjects;
...@@ -85,6 +87,16 @@ public class Versioned<V> { ...@@ -85,6 +87,16 @@ public class Versioned<V> {
85 return creationTime; 87 return creationTime;
86 } 88 }
87 89
90 + /**
91 + * Maps this instance into another after transforming its
92 + * value while retaining the same version and creationTime.
93 + * @param transformer function to mapping the value
94 + * @return mapped instance
95 + */
96 + public <U> Versioned<U> map(Function<V, U> transformer) {
97 + return new Versioned<>(transformer.apply(value), version, creationTime);
98 + }
99 +
88 @Override 100 @Override
89 public String toString() { 101 public String toString() {
90 return MoreObjects.toStringHelper(this) 102 return MoreObjects.toStringHelper(this)
......
...@@ -276,8 +276,12 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -276,8 +276,12 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
276 checkIfUnmodifiable(); 276 checkIfUnmodifiable();
277 return database.remove(name, keyCache.getUnchecked(key)) 277 return database.remove(name, keyCache.getUnchecked(key))
278 .thenApply(this::unwrapResult) 278 .thenApply(this::unwrapResult)
279 - .thenApply(v -> v != null 279 + .thenApply(v -> v != null ? v.<V>map(serializer::decode) : null)
280 - ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); 280 + .whenComplete((r, e) -> {
281 + if (r != null) {
282 + notifyListeners(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r));
283 + }
284 + });
281 } 285 }
282 286
283 @Override 287 @Override
...@@ -316,12 +320,19 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -316,12 +320,19 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
316 checkNotNull(key, ERROR_NULL_KEY); 320 checkNotNull(key, ERROR_NULL_KEY);
317 checkNotNull(value, ERROR_NULL_VALUE); 321 checkNotNull(value, ERROR_NULL_VALUE);
318 checkIfUnmodifiable(); 322 checkIfUnmodifiable();
319 - return database.putIfAbsent(name, 323 + AtomicReference<MapEvent<K, V>> event = new AtomicReference<>();
320 - keyCache.getUnchecked(key), 324 + return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
321 - serializer.encode(value))
322 .thenApply(this::unwrapResult) 325 .thenApply(this::unwrapResult)
323 - .thenApply(v -> v != null ? 326 + .whenComplete((r, e) -> {
324 - new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null); 327 + if (r != null && r.updated()) {
328 + event.set(new MapEvent<K, V>(name,
329 + MapEvent.Type.INSERT,
330 + key,
331 + r.newValue().<V>map(serializer::decode)));
332 + }
333 + })
334 + .thenApply(v -> v.updated() ? null : v.oldValue().<V>map(serializer::decode))
335 + .whenComplete((r, e) -> notifyListeners(event.get()));
325 } 336 }
326 337
327 @Override 338 @Override
......