Madan Jampani
Committed by Gerrit Code Review

ONOS-1362: Support async version of ConsistentMap that lets efficient chaining of operations

Change-Id: I672a15ba2a517db3e22f6ce8d739ca48307e6e63
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.Collection;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
/**
* A distributed, strongly consistent map whose methods are all executed asynchronously.
* <p>
* This map offers strong read-after-update (where update == create/update/delete)
* consistency. All operations to the map are serialized and applied in a consistent
* manner.
* <p>
* The stronger consistency comes at the expense of availability in
* the event of a network partition. A network partition can be either due to
* a temporary disruption in network connectivity between participating nodes
* or due to a node being temporarily down.
* </p><p>
* All values stored in this map are versioned and the API supports optimistic
* concurrency by allowing conditional updates that take into consideration
* the version or value that was previously read.
* </p><p>
* This map does not allow null values. All methods can throw a ConsistentMapException
* (which extends RuntimeException) to indicate failures.
*
*/
public interface AsyncConsistentMap<K, V> {
/**
* Returns the number of entries in the map.
*
* @return a future for map size.
*/
CompletableFuture<Integer> size();
/**
* Returns true if the map is empty.
*
* @return a future whose value will be true if map has no entries, false otherwise.
*/
CompletableFuture<Boolean> isEmpty();
/**
* Returns true if this map contains a mapping for the specified key.
*
* @param key key
* @return a future whose value will be true if map contains key, false otherwise.
*/
CompletableFuture<Boolean> containsKey(K key);
/**
* Returns true if this map contains the specified value.
*
* @param value value
* @return a future whose value will be true if map contains value, false otherwise.
*/
CompletableFuture<Boolean> containsValue(V value);
/**
* Returns the value (and version) to which the specified key is mapped, or null if this
* map contains no mapping for the key.
*
* @param key the key whose associated value (and version) is to be returned
* @return a future value (and version) to which the specified key is mapped, or null if
* this map contains no mapping for the key
*/
CompletableFuture<Versioned<V>> get(K key);
/**
* Associates the specified value with the specified key in this map (optional operation).
* If the map previously contained a mapping for the key, the old value is replaced by the
* specified value.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value (and version) associated with key, or null if there was
* no mapping for key.
*/
CompletableFuture<Versioned<V>> put(K key, V value);
/**
* Removes the mapping for a key from this map if it is present (optional operation).
*
* @param key key whose value is to be removed from the map
* @return the value (and version) to which this map previously associated the key,
* or null if the map contained no mapping for the key.
*/
CompletableFuture<Versioned<V>> remove(K key);
/**
* Removes all of the mappings from this map (optional operation).
* The map will be empty after this call returns.
*/
CompletableFuture<Void> clear();
/**
* Returns a Set view of the keys contained in this map.
* This method differs from the behavior of java.util.Map.keySet() in that
* what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
* Attempts to modify the returned set, whether direct or via its iterator,
* result in an UnsupportedOperationException.
*
* @return a set of the keys contained in this map
*/
CompletableFuture<Set<K>> keySet();
/**
* Returns the collection of values (and associated versions) contained in this map.
* This method differs from the behavior of java.util.Map.values() in that
* what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
* Attempts to modify the returned collection, whether direct or via its iterator,
* result in an UnsupportedOperationException.
*
* @return a collection of the values (and associated versions) contained in this map
*/
CompletableFuture<Collection<Versioned<V>>> values();
/**
* Returns the set of entries contained in this map.
* This method differs from the behavior of java.util.Map.entrySet() in that
* what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
* Attempts to modify the returned set, whether direct or via its iterator,
* result in an UnsupportedOperationException.
*
* @return set of entries contained in this map.
*/
CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet();
/**
* If the specified key is not already associated with a value
* associates it with the given value and returns null, else returns the current value.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with the specified key or null
* if key does not already mapped to a value.
*/
CompletableFuture<Versioned<V>> putIfAbsent(K key, V value);
/**
* Removes the entry for the specified key only if it is currently
* mapped to the specified value.
*
* @param key key with which the specified value is associated
* @param value value expected to be associated with the specified key
* @return true if the value was removed
*/
CompletableFuture<Boolean> remove(K key, V value);
/**
* Removes the entry for the specified key only if its current
* version in the map is equal to the specified version.
*
* @param key key with which the specified version is associated
* @param version version expected to be associated with the specified key
* @return true if the value was removed
*/
CompletableFuture<Boolean> remove(K key, long version);
/**
* Replaces the entry for the specified key only if currently mapped
* to the specified value.
*
* @param key key with which the specified value is associated
* @param oldValue value expected to be associated with the specified key
* @param newValue value to be associated with the specified key
* @return true if the value was replaced
*/
CompletableFuture<Boolean> replace(K key, V oldValue, V newValue);
/**
* Replaces the entry for the specified key only if it is currently mapped to the
* specified version.
*
* @param key key key with which the specified value is associated
* @param oldVersion version expected to be associated with the specified key
* @param newValue value to be associated with the specified key
* @return true if the value was replaced
*/
CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue);
}
......@@ -30,9 +30,8 @@ public interface StorageService {
/**
* Creates a ConsistentMap.
*
* @param name map name
* @param serializer serializer to use for serializing keys and values.
* @param serializer serializer to use for serializing keys and values
* @return consistent map.
* @param <K> key type
* @param <V> value type
......@@ -40,6 +39,16 @@ public interface StorageService {
<K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer);
/**
* Creates a AsyncConsistentMap.
* @param name map name
* @param serializer serializer to use for serializing keys and values
* @return async consistent map
* @param <K> key type
* @param <V> value type
*/
<K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer);
/**
* Creates a new transaction context.
* @return transaction context
*/
......
......@@ -32,6 +32,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
......@@ -168,7 +169,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
return new DefaultConsistentMap<K, V>(name, partitionedDatabase, serializer);
}
@Override
public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
return new DefaultAsyncConsistentMap<K, V>(name, partitionedDatabase, serializer);
}
@Override
......
......@@ -19,20 +19,15 @@ package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.*;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
......@@ -41,19 +36,18 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
/**
* ConsistentMap implementation that is backed by a Raft consensus
* AsyncConsistentMap implementation that is backed by a Raft consensus
* based database.
*
* @param <K> type of key.
* @param <V> type of value.
*/
public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
private final String name;
private final DatabaseProxy<String, byte[]> proxy;
private final Serializer serializer;
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
......@@ -71,7 +65,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
return serializer.decode(HexString.fromHexString(key));
}
public ConsistentMapImpl(String name,
public DefaultAsyncConsistentMap(String name,
DatabaseProxy<String, byte[]> proxy,
Serializer serializer) {
this.name = checkNotNull(name, "map name cannot be null");
......@@ -80,152 +74,119 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
}
@Override
public int size() {
return complete(proxy.size(name));
public CompletableFuture<Integer> size() {
return proxy.size(name);
}
@Override
public boolean isEmpty() {
return complete(proxy.isEmpty(name));
public CompletableFuture<Boolean> isEmpty() {
return proxy.isEmpty(name);
}
@Override
public boolean containsKey(K key) {
public CompletableFuture<Boolean> containsKey(K key) {
checkNotNull(key, ERROR_NULL_KEY);
return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
return proxy.containsKey(name, keyCache.getUnchecked(key));
}
@Override
public boolean containsValue(V value) {
public CompletableFuture<Boolean> containsValue(V value) {
checkNotNull(value, ERROR_NULL_VALUE);
return complete(proxy.containsValue(name, serializer.encode(value)));
return proxy.containsValue(name, serializer.encode(value));
}
@Override
public Versioned<V> get(K key) {
public CompletableFuture<Versioned<V>> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
if (value == null) {
return null;
}
return new Versioned<>(
serializer.decode(value.value()),
value.version(),
value.creationTime());
return proxy.get(name, keyCache.getUnchecked(key))
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public Versioned<V> put(K key, V value) {
public CompletableFuture<Versioned<V>> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> previousValue =
complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
if (previousValue == null) {
return null;
}
return new Versioned<>(
serializer.decode(previousValue.value()),
previousValue.version(),
previousValue.creationTime());
return proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value))
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public Versioned<V> remove(K key) {
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
if (value == null) {
return null;
}
return new Versioned<>(
serializer.decode(value.value()),
value.version(),
value.creationTime());
return proxy.remove(name, keyCache.getUnchecked(key))
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public void clear() {
complete(proxy.clear(name));
public CompletableFuture<Void> clear() {
return proxy.clear(name);
}
@Override
public Set<K> keySet() {
return Collections.unmodifiableSet(complete(proxy.keySet(name))
public CompletableFuture<Set<K>> keySet() {
return proxy.keySet(name)
.thenApply(s -> s
.stream()
.map(this::dK)
.collect(Collectors.toSet()));
}
@Override
public Collection<Versioned<V>> values() {
return Collections.unmodifiableList(complete(proxy.values(name))
public CompletableFuture<Collection<Versioned<V>>> values() {
return proxy.values(name).thenApply(c -> c
.stream()
.map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
.collect(Collectors.toList()));
}
@Override
public Set<Entry<K, Versioned<V>>> entrySet() {
return Collections.unmodifiableSet(complete(proxy.entrySet(name))
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
return proxy.entrySet(name).thenApply(s -> s
.stream()
.map(this::fromRawEntry)
.collect(Collectors.toSet()));
}
@Override
public Versioned<V> putIfAbsent(K key, V value) {
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
name, keyCache.getUnchecked(key), serializer.encode(value)));
if (existingValue == null) {
return null;
}
return new Versioned<>(
serializer.decode(existingValue.value()),
existingValue.version(),
existingValue.creationTime());
return proxy.putIfAbsent(
name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
v != null ?
new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public boolean remove(K key, V value) {
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
return proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
}
@Override
public boolean remove(K key, long version) {
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
return proxy.remove(name, keyCache.getUnchecked(key), version);
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
return proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
}
@Override
public boolean replace(K key, long oldVersion, V newValue) {
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsistentMapException.Interrupted();
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
throw new ConsistentMapException(e.getCause());
}
return proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.Set;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
/**
* ConsistentMap implementation that is backed by a Raft consensus
* based database.
*
* @param <K> type of key.
* @param <V> type of value.
*/
public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncConsistentMap<K, V> asyncMap;
public DefaultConsistentMap(String name,
DatabaseProxy<String, byte[]> proxy,
Serializer serializer) {
asyncMap = new DefaultAsyncConsistentMap<>(name, proxy, serializer);
}
@Override
public int size() {
return complete(asyncMap.size());
}
@Override
public boolean isEmpty() {
return complete(asyncMap.isEmpty());
}
@Override
public boolean containsKey(K key) {
return complete(asyncMap.containsKey(key));
}
@Override
public boolean containsValue(V value) {
return complete(asyncMap.containsValue(value));
}
@Override
public Versioned<V> get(K key) {
return complete(asyncMap.get(key));
}
@Override
public Versioned<V> put(K key, V value) {
return complete(asyncMap.put(key, value));
}
@Override
public Versioned<V> remove(K key) {
return complete(asyncMap.remove(key));
}
@Override
public void clear() {
complete(asyncMap.clear());
}
@Override
public Set<K> keySet() {
return complete(asyncMap.keySet());
}
@Override
public Collection<Versioned<V>> values() {
return complete(asyncMap.values());
}
@Override
public Set<Entry<K, Versioned<V>>> entrySet() {
return complete(asyncMap.entrySet());
}
@Override
public Versioned<V> putIfAbsent(K key, V value) {
return complete(asyncMap.putIfAbsent(key, value));
}
@Override
public boolean remove(K key, V value) {
return complete(asyncMap.remove(key, value));
}
@Override
public boolean remove(K key, long version) {
return complete(asyncMap.remove(key, version));
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
return complete(asyncMap.replace(key, oldValue, newValue));
}
@Override
public boolean replace(K key, long oldVersion, V newValue) {
return complete(asyncMap.replace(key, oldVersion, newValue));
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsistentMapException.Interrupted();
} catch (TimeoutException e) {
throw new ConsistentMapException.Timeout();
} catch (ExecutionException e) {
throw new ConsistentMapException(e.getCause());
}
}
}
\ No newline at end of file
......@@ -63,7 +63,7 @@ public class DefaultTransactionContext implements TransactionContext {
checkNotNull(serializer, "serializer is null");
checkState(isOpen, TX_NOT_OPEN_ERROR);
if (!txMaps.containsKey(mapName)) {
ConsistentMap<K, V> backingMap = new ConsistentMapImpl<>(mapName, databaseProxy, serializer);
ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, databaseProxy, serializer);
DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
txMaps.put(mapName, txMap);
}
......