Madan Jampani
Committed by Gerrit Code Review

DefaultConsistentMap to automatically retry compute* calls failing due to ConcurrentModification

Change-Id: If59e432e423d323282eb8fe7b1b438899154aae9
......@@ -19,6 +19,7 @@ package org.onosproject.store.primitives.impl;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -28,14 +29,18 @@ import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.ConsistentMapBackedJavaMap;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
/**
* Default implementation of {@code ConsistentMap}.
*
......@@ -45,6 +50,7 @@ import org.onosproject.store.service.Versioned;
public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K, V>> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private static final int MAX_DELAY_BETWEEN_RETY_MILLS = 50;
private final AsyncConsistentMap<K, V> asyncMap;
private Map<K, V> javaMap;
......@@ -82,26 +88,29 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
@Override
public Versioned<V> computeIfAbsent(K key,
Function<? super K, ? extends V> mappingFunction) {
return complete(asyncMap.computeIfAbsent(key, mappingFunction));
return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
}
@Override
public Versioned<V> computeIfPresent(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return complete(asyncMap.computeIfPresent(key, remappingFunction));
return computeIf(key, Objects::nonNull, remappingFunction);
}
@Override
public Versioned<V> compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return complete(asyncMap.compute(key, remappingFunction));
return computeIf(key, v -> true, remappingFunction);
}
@Override
public Versioned<V> computeIf(K key,
Predicate<? super V> condition,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return complete(asyncMap.computeIf(key, condition, remappingFunction));
return Tools.retryable(() -> complete(asyncMap.computeIf(key, condition, remappingFunction)),
ConcurrentModification.class,
Integer.MAX_VALUE,
MAX_DELAY_BETWEEN_RETY_MILLS).get();
}
@Override
......@@ -203,11 +212,8 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof ConsistentMapException) {
throw (ConsistentMapException) e.getCause();
} else {
throw new ConsistentMapException(e.getCause());
}
Throwables.propagateIfPossible(e.getCause());
throw new ConsistentMapException(e.getCause());
}
}
}
\ No newline at end of file
......