Madan Jampani
Committed by Gerrit Code Review

Added destroy() method to DistributedPrimitive interface

Implement replace method in ConsistentMap
Using Versioned#valueOrNull in place of Versioned#valueOrElse where appropriate

Change-Id: Ief3f3547d589d35f5c821a1c47035f91078e8316
......@@ -112,9 +112,4 @@ public class VtnEventuallyConsistentMapAdapter<K, V> implements EventuallyConsis
public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
}
@Override
public void destroy() {
}
}
......
......@@ -42,6 +42,11 @@ public class DefaultPartition implements Partition {
this.members = ImmutableSet.copyOf(members);
}
public DefaultPartition(Partition other) {
this.id = checkNotNull(other.getId());
this.members = ImmutableSet.copyOf(other.getMembers());
}
@Override
public PartitionId getId() {
return this.id;
......
......@@ -37,12 +37,12 @@ import java.util.function.Predicate;
* a temporary disruption in network connectivity between participating nodes
* or due to a node being temporarily down.
* </p><p>
* All values stored in this map are versioned and the API supports optimistic
* concurrency by allowing conditional updates that take into consideration
* the version or value that was previously read.
* All values stored in this map are {@link Versioned versioned} and the API
* supports optimistic concurrency by allowing conditional updates that take into
* consideration the version or value that was previously read.
* </p><p>
* This map does not allow null values. All methods can throw a ConsistentMapException
* (which extends RuntimeException) to indicate failures.
* (which extends {@code RuntimeException}) to indicate failures.
* <p>
* All methods of this interface return a {@link CompletableFuture future} immediately
* after a successful invocation. The operation itself is executed asynchronous and
......@@ -56,6 +56,11 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
return DistributedPrimitive.Type.CONSISTENT_MAP;
}
@Override
default CompletableFuture<Void> destroy() {
return clear();
}
/**
* Returns the number of entries in the map.
*
......
......@@ -25,24 +25,11 @@ import java.util.function.Function;
import java.util.function.Predicate;
/**
* A distributed, strongly consistent key-value map.
* <p>
* This map offers strong read-after-update (where update == create/update/delete)
* consistency. All operations to the map are serialized and applied in a consistent
* manner.
* <p>
* The stronger consistency comes at the expense of availability in
* the event of a network partition. A network partition can be either due to
* a temporary disruption in network connectivity between participating nodes
* or due to a node being temporarily down.
* </p><p>
* All values stored in this map are versioned and the API supports optimistic
* concurrency by allowing conditional updates that take into consideration
* the version or value that was previously read.
* </p><p>
* This map does not allow null values. All methods can throw a ConsistentMapException
* (which extends RuntimeException) to indicate failures.
* {@code ConsistentMap} provides the same functionality as {@link AsyncConsistentMap} with
* the only difference that all its methods block until the corresponding operation completes.
*
* @param <K> type of key
* @param <V> type of value
*/
public interface ConsistentMap<K, V> extends DistributedPrimitive {
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
import org.onosproject.core.ApplicationId;
/**
......@@ -76,4 +78,15 @@ public interface DistributedPrimitive {
default ApplicationId applicationId() {
return null;
}
/**
* Purges state associated with this primitive.
* <p>
* Implementations can override and provide appropriate clean up logic for purging
* any state state associated with the primitive. Whether modifications made within the
* destroy method have local or global visibility is left unspecified.
*/
default CompletableFuture<Void> destroy() {
return CompletableFuture.completedFuture(null);
}
}
......
......@@ -201,12 +201,4 @@ public interface EventuallyConsistentMap<K, V> extends DistributedPrimitive {
* @param listener listener to deregister for events
*/
void removeListener(EventuallyConsistentMapListener<K, V> listener);
/**
* Shuts down the map and breaks communication between different instances.
* This allows the map objects to be cleaned up and garbage collected.
* Calls to any methods on the map subsequent to calling destroy() will
* throw a {@link java.lang.RuntimeException}.
*/
void destroy();
}
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
/**
* DistributedPrimitive that is a synchronous (blocking) version of
* another.
......@@ -38,4 +40,9 @@ public abstract class Synchronous<T extends DistributedPrimitive> implements Dis
public Type type() {
return primitive.type();
}
@Override
public CompletableFuture<Void> destroy() {
return primitive.destroy();
}
}
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.service;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.onosproject.store.service.DistributedPrimitive.Type;
......@@ -118,7 +119,7 @@ public class EventuallyConsistentMapAdapter<K, V> implements EventuallyConsisten
}
@Override
public void destroy() {
public CompletableFuture<Void> destroy() {
return CompletableFuture.completedFuture(null);
}
}
......
......@@ -30,7 +30,7 @@ import com.google.common.collect.Collections2;
import com.google.common.collect.Maps;
/**
* Standard java Map backed by a ConsistentMap.
* Standard java {@link Map} backed by a {@link ConsistentMap}.
*
* @param <K> key type
* @param <V> value type
......@@ -65,7 +65,7 @@ public final class ConsistentMapBackedJavaMap<K, V> implements Map<K, V> {
@Override
public V get(Object key) {
return Versioned.valueOrElse(backingMap.get((K) key), null);
return Versioned.valueOrNull(backingMap.get((K) key));
}
@Override
......@@ -75,17 +75,17 @@ public final class ConsistentMapBackedJavaMap<K, V> implements Map<K, V> {
@Override
public V put(K key, V value) {
return Versioned.valueOrElse(backingMap.put(key, value), null);
return Versioned.valueOrNull(backingMap.put(key, value));
}
@Override
public V putIfAbsent(K key, V value) {
return Versioned.valueOrElse(backingMap.putIfAbsent(key, value), null);
return Versioned.valueOrNull(backingMap.putIfAbsent(key, value));
}
@Override
public V remove(Object key) {
return Versioned.valueOrElse(backingMap.remove((K) key), null);
return Versioned.valueOrNull(backingMap.remove((K) key));
}
@Override
......@@ -95,7 +95,7 @@ public final class ConsistentMapBackedJavaMap<K, V> implements Map<K, V> {
@Override
public V replace(K key, V value) {
throw new UnsupportedOperationException();
return Versioned.valueOrNull(backingMap.replace(key, value));
}
@Override
......@@ -117,17 +117,17 @@ public final class ConsistentMapBackedJavaMap<K, V> implements Map<K, V> {
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return Versioned.valueOrElse(backingMap.compute(key, remappingFunction), null);
return Versioned.valueOrNull(backingMap.compute(key, remappingFunction));
}
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
return Versioned.valueOrElse(backingMap.computeIfAbsent(key, mappingFunction), null);
return Versioned.valueOrNull(backingMap.computeIfAbsent(key, mappingFunction));
}
@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
return Versioned.valueOrElse(backingMap.computeIfPresent(key, remappingFunction), null);
return Versioned.valueOrNull(backingMap.computeIfPresent(key, remappingFunction));
}
@Override
......
......@@ -16,24 +16,10 @@
package org.onosproject.store.primitives.impl;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
import org.onlab.util.HexString;
import org.onlab.util.Match;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.MAP_UPDATE;
import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.TX_COMMIT;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Collections;
......@@ -49,10 +35,24 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.MAP_UPDATE;
import static org.onosproject.store.primitives.impl.StateMachineUpdate.Target.TX_COMMIT;
import static org.slf4j.LoggerFactory.getLogger;
import org.onlab.util.HexString;
import org.onlab.util.Match;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Maps;
/**
* AsyncConsistentMap implementation that is backed by a Raft consensus
......
......@@ -19,6 +19,7 @@ package org.onosproject.store.primitives.impl;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
......@@ -26,7 +27,6 @@ import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.Set;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
......@@ -36,8 +36,7 @@ import org.onosproject.store.service.Synchronous;
import org.onosproject.store.service.Versioned;
/**
* ConsistentMap implementation that is backed by a Raft consensus
* based database.
* Default implementation of {@code ConsistentMap}.
*
* @param <K> type of key.
* @param <V> type of value.
......@@ -46,10 +45,10 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final DefaultAsyncConsistentMap<K, V> asyncMap;
private final AsyncConsistentMap<K, V> asyncMap;
private Map<K, V> javaMap;
public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
public DefaultConsistentMap(AsyncConsistentMap<K, V> asyncMap) {
super(asyncMap);
this.asyncMap = asyncMap;
}
......@@ -169,31 +168,14 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
return complete(asyncMap.replace(key, oldVersion, newValue));
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsistentMapException.Interrupted();
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof ConsistentMapException) {
throw (ConsistentMapException) e.getCause();
} else {
throw new ConsistentMapException(e.getCause());
}
}
}
@Override
public void addListener(MapEventListener<K, V> listener) {
asyncMap.addListener(listener);
complete(asyncMap.addListener(listener));
}
@Override
public void removeListener(MapEventListener<K, V> listener) {
asyncMap.addListener(listener);
complete(asyncMap.addListener(listener));
}
@Override
......@@ -205,4 +187,21 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
}
return javaMap;
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsistentMapException.Interrupted();
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof ConsistentMapException) {
throw (ConsistentMapException) e.getCause();
} else {
throw new ConsistentMapException(e.getCause());
}
}
}
}
\ No newline at end of file
......
......@@ -15,12 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.SetEventListener;
import org.onosproject.store.service.Synchronous;
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
......@@ -29,6 +23,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.SetEventListener;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.Synchronous;
/**
* Implementation of {@link DistributedSet} that merely delegates to a {@link AsyncDistributedSet}
* and waits for the operation to complete.
......@@ -51,14 +51,14 @@ public class DefaultDistributedSet<E> extends Synchronous<AsyncDistributedSet<E>
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsistentMapException.Interrupted();
throw new StorageException.Interrupted();
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
throw new StorageException.Timeout();
} catch (ExecutionException e) {
if (e.getCause() instanceof ConsistentMapException) {
throw (ConsistentMapException) e.getCause();
if (e.getCause() instanceof StorageException) {
throw (StorageException) e.getCause();
} else {
throw new ConsistentMapException(e.getCause());
throw new StorageException(e.getCause());
}
}
}
......
......@@ -31,6 +31,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......@@ -502,7 +503,7 @@ public class EventuallyConsistentMapImpl<K, V>
}
@Override
public void destroy() {
public CompletableFuture<Void> destroy() {
destroyed = true;
executor.shutdown();
......@@ -513,6 +514,7 @@ public class EventuallyConsistentMapImpl<K, V>
clusterCommunicator.removeSubscriber(updateMessageSubject);
clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
return CompletableFuture.completedFuture(null);
}
private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
......