Committed by
Gerrit Code Review
Added a compute method to ECMap to simplify map interactions following a read-modify-write template.
Change-Id: If8c791ce1f49a7b5b3d04941b6e03a10261c6f6f
Showing
3 changed files
with
106 additions
and
0 deletions
... | @@ -18,6 +18,7 @@ package org.onosproject.store.service; | ... | @@ -18,6 +18,7 @@ package org.onosproject.store.service; |
18 | import java.util.Collection; | 18 | import java.util.Collection; |
19 | import java.util.Map; | 19 | import java.util.Map; |
20 | import java.util.Set; | 20 | import java.util.Set; |
21 | +import java.util.function.BiFunction; | ||
21 | 22 | ||
22 | /** | 23 | /** |
23 | * A distributed, eventually consistent map. | 24 | * A distributed, eventually consistent map. |
... | @@ -130,6 +131,17 @@ public interface EventuallyConsistentMap<K, V> { | ... | @@ -130,6 +131,17 @@ public interface EventuallyConsistentMap<K, V> { |
130 | void remove(K key, V value); | 131 | void remove(K key, V value); |
131 | 132 | ||
132 | /** | 133 | /** |
134 | + * Attempts to compute a mapping for the specified key and its current mapped | ||
135 | + * value (or null if there is no current mapping). | ||
136 | + * <p> | ||
137 | + * If the function returns null, the mapping is removed (or remains absent if initially absent). | ||
138 | + * @param key map key | ||
139 | + * @param recomputeFunction function to recompute a new value | ||
140 | + * @return new value | ||
141 | + */ | ||
142 | + V compute(K key, BiFunction<K, V, V> recomputeFunction); | ||
143 | + | ||
144 | + /** | ||
133 | * Adds mappings for all key-value pairs in the specified map to this map. | 145 | * Adds mappings for all key-value pairs in the specified map to this map. |
134 | * <p> | 146 | * <p> |
135 | * This will be more efficient in communication than calling individual put | 147 | * This will be more efficient in communication than calling individual put | ... | ... |
... | @@ -381,6 +381,38 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -381,6 +381,38 @@ public class EventuallyConsistentMapImpl<K, V> |
381 | } | 381 | } |
382 | 382 | ||
383 | @Override | 383 | @Override |
384 | + public V compute(K key, BiFunction<K, V, V> recomputeFunction) { | ||
385 | + checkState(!destroyed, destroyedMessage); | ||
386 | + checkNotNull(key, ERROR_NULL_KEY); | ||
387 | + checkNotNull(recomputeFunction, "Recompute function cannot be null"); | ||
388 | + | ||
389 | + AtomicBoolean updated = new AtomicBoolean(false); | ||
390 | + AtomicReference<MapValue<V>> previousValue = new AtomicReference<>(); | ||
391 | + MapValue<V> computedValue = items.compute(key, (k, mv) -> { | ||
392 | + previousValue.set(mv); | ||
393 | + V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get()); | ||
394 | + MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue)); | ||
395 | + if (mv == null || newValue.isNewerThan(mv)) { | ||
396 | + updated.set(true); | ||
397 | + return newValue; | ||
398 | + } else { | ||
399 | + return mv; | ||
400 | + } | ||
401 | + }); | ||
402 | + if (updated.get()) { | ||
403 | + notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get())); | ||
404 | + EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT; | ||
405 | + V value = computedValue.isTombstone() | ||
406 | + ? previousValue.get() == null ? null : previousValue.get().get() | ||
407 | + : computedValue.get(); | ||
408 | + if (value != null) { | ||
409 | + notifyListeners(new EventuallyConsistentMapEvent<>(updateType, key, value)); | ||
410 | + } | ||
411 | + } | ||
412 | + return computedValue.get(); | ||
413 | + } | ||
414 | + | ||
415 | + @Override | ||
384 | public void putAll(Map<? extends K, ? extends V> m) { | 416 | public void putAll(Map<? extends K, ? extends V> m) { |
385 | checkState(!destroyed, destroyedMessage); | 417 | checkState(!destroyed, destroyedMessage); |
386 | m.forEach(this::put); | 418 | m.forEach(this::put); | ... | ... |
... | @@ -382,6 +382,68 @@ public class EventuallyConsistentMapImplTest { | ... | @@ -382,6 +382,68 @@ public class EventuallyConsistentMapImplTest { |
382 | } | 382 | } |
383 | 383 | ||
384 | @Test | 384 | @Test |
385 | + public void testCompute() throws Exception { | ||
386 | + // Set up expectations of external events to be sent to listeners during | ||
387 | + // the test. These don't use timestamps so we can set them all up at once. | ||
388 | + EventuallyConsistentMapListener<String, String> listener | ||
389 | + = getListener(); | ||
390 | + listener.event(new EventuallyConsistentMapEvent<>( | ||
391 | + EventuallyConsistentMapEvent.Type.PUT, KEY1, VALUE1)); | ||
392 | + listener.event(new EventuallyConsistentMapEvent<>( | ||
393 | + EventuallyConsistentMapEvent.Type.REMOVE, KEY1, VALUE1)); | ||
394 | + listener.event(new EventuallyConsistentMapEvent<>( | ||
395 | + EventuallyConsistentMapEvent.Type.PUT, KEY2, VALUE2)); | ||
396 | + replay(listener); | ||
397 | + | ||
398 | + ecMap.addListener(listener); | ||
399 | + | ||
400 | + // Put in an initial value | ||
401 | + expectPeerMessage(clusterCommunicator); | ||
402 | + ecMap.compute(KEY1, (k, v) -> VALUE1); | ||
403 | + assertEquals(VALUE1, ecMap.get(KEY1)); | ||
404 | + | ||
405 | + // Remove the value and check the correct internal cluster messages | ||
406 | + // are sent | ||
407 | + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), | ||
408 | + UPDATE_MESSAGE_SUBJECT, clusterCommunicator); | ||
409 | + | ||
410 | + ecMap.compute(KEY1, (k, v) -> null); | ||
411 | + assertNull(ecMap.get(KEY1)); | ||
412 | + | ||
413 | + verify(clusterCommunicator); | ||
414 | + | ||
415 | + // Remove the same value again. Even though the value is no longer in | ||
416 | + // the map, we expect that the tombstone is updated and another remove | ||
417 | + // event is sent to the cluster and external listeners. | ||
418 | + expectSpecificMulticastMessage(generateRemoveMessage(KEY1, clockService.peekAtNextTimestamp()), | ||
419 | + UPDATE_MESSAGE_SUBJECT, clusterCommunicator); | ||
420 | + | ||
421 | + ecMap.compute(KEY1, (k, v) -> null); | ||
422 | + assertNull(ecMap.get(KEY1)); | ||
423 | + | ||
424 | + verify(clusterCommunicator); | ||
425 | + | ||
426 | + // Put in a new value for us to try and remove | ||
427 | + expectPeerMessage(clusterCommunicator); | ||
428 | + | ||
429 | + ecMap.compute(KEY2, (k, v) -> VALUE2); | ||
430 | + | ||
431 | + clockService.turnBackTime(); | ||
432 | + | ||
433 | + // Remove should have no effect, since it has an older timestamp than | ||
434 | + // the put. Expect no notifications to be sent out | ||
435 | + reset(clusterCommunicator); | ||
436 | + replay(clusterCommunicator); | ||
437 | + | ||
438 | + ecMap.compute(KEY2, (k, v) -> null); | ||
439 | + | ||
440 | + verify(clusterCommunicator); | ||
441 | + | ||
442 | + // Check that our listener received the correct events during the test | ||
443 | + verify(listener); | ||
444 | + } | ||
445 | + | ||
446 | + @Test | ||
385 | public void testPutAll() throws Exception { | 447 | public void testPutAll() throws Exception { |
386 | // putAll() with an empty map is a no-op - no messages will be sent | 448 | // putAll() with an empty map is a no-op - no messages will be sent |
387 | reset(clusterCommunicator); | 449 | reset(clusterCommunicator); | ... | ... |
-
Please register or login to post a comment