Madan Jampani

ONOS-4396: Fix for EC Map synchronization failing silently due to serialization failures.

With this change we proactively fail map updates when serialization failures can occur and immediately notify the caller

Change-Id: I62a8a84731b9c2a6eeff7fa6f8336dc74234bf30
...@@ -339,8 +339,11 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -339,8 +339,11 @@ public class EventuallyConsistentMapImpl<K, V>
339 checkNotNull(value, ERROR_NULL_VALUE); 339 checkNotNull(value, ERROR_NULL_VALUE);
340 340
341 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value)); 341 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
342 + // Before mutating local map, ensure the update can be serialized without errors.
343 + // This prevents replica divergence due to serialization failures.
344 + UpdateEntry<K, V> update = serializer.copy(new UpdateEntry<K, V>(key, newValue));
342 if (putInternal(key, newValue)) { 345 if (putInternal(key, newValue)) {
343 - notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value)); 346 + notifyPeers(update, peerUpdateFunction.apply(key, value));
344 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value)); 347 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
345 } 348 }
346 } 349 }
...@@ -417,13 +420,15 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -417,13 +420,15 @@ public class EventuallyConsistentMapImpl<K, V>
417 420
418 AtomicBoolean updated = new AtomicBoolean(false); 421 AtomicBoolean updated = new AtomicBoolean(false);
419 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>(); 422 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
420 - MapValue<V> computedValue = items.compute(key, (k, mv) -> { 423 + MapValue<V> computedValue = items.compute(serializer.copy(key), (k, mv) -> {
421 previousValue.set(mv); 424 previousValue.set(mv);
422 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get()); 425 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
423 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue)); 426 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
424 if (mv == null || newValue.isNewerThan(mv)) { 427 if (mv == null || newValue.isNewerThan(mv)) {
425 updated.set(true); 428 updated.set(true);
426 - return newValue; 429 + // We return a copy to ensure updates to peers can be serialized.
430 + // This prevents replica divergence due to serialization failures.
431 + return serializer.copy(newValue);
427 } else { 432 } else {
428 return mv; 433 return mv;
429 } 434 }
......
...@@ -78,6 +78,11 @@ public class KryoSerializer implements StoreSerializer { ...@@ -78,6 +78,11 @@ public class KryoSerializer implements StoreSerializer {
78 } 78 }
79 79
80 @Override 80 @Override
81 + public <T> T copy(T object) {
82 + return decode(encode(object));
83 + }
84 +
85 + @Override
81 public String toString() { 86 public String toString() {
82 return MoreObjects.toStringHelper(getClass()) 87 return MoreObjects.toStringHelper(getClass())
83 .add("serializerPool", serializerPool) 88 .add("serializerPool", serializerPool)
......
...@@ -75,4 +75,13 @@ public interface StoreSerializer { ...@@ -75,4 +75,13 @@ public interface StoreSerializer {
75 * @param <T> decoded type 75 * @param <T> decoded type
76 */ 76 */
77 <T> T decode(final InputStream stream); 77 <T> T decode(final InputStream stream);
78 +
79 + /**
80 + * Returns a copy of the specfied object.
81 + *
82 + * @param object object to copy
83 + * @return a copy of the object
84 + * @param <T> object type
85 + */
86 + <T> T copy(final T object);
78 } 87 }
......