Aaron Kruglikov
Committed by Gerrit Code Review

Adding additional resources for instantiating async consistent treemaps.

Change-Id: I7bfc602ac22eda1844fea2a7b3e3133f83157bf3
......@@ -16,6 +16,7 @@
package org.onosproject.vtnrsc.util;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
......@@ -42,6 +43,11 @@ public class VtnStorageServiceAdapter implements StorageService {
}
@Override
public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
return null;
}
@Override
public <E> DistributedSetBuilder<E> setBuilder() {
return null;
}
......
......@@ -45,7 +45,6 @@ import java.util.function.Predicate;
public class DefaultConsistentTreeMap<V>
extends Synchronous<AsyncConsistentTreeMap<V>>
implements ConsistentTreeMap<V> {
private static final int MAX_DELAY_BETWEEN_RETRY_MILLIS = 50;
private final AsyncConsistentTreeMap<V> treeMap;
private final long operationTimeoutMillis;
private Map<String, V> javaMap;
......
......@@ -20,6 +20,7 @@ import java.util.Set;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.WorkQueue;
......@@ -42,6 +43,16 @@ public interface DistributedPrimitiveCreator {
<K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer);
/**
* Creates a new {@code AsyncConsistentTreeMap}.
*
* @param name tree name
* @param serializer serializer to use for serializing/deserializing map entries
* @param <V> value type
* @return distributedTreeMap
*/
<V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer);
/**
* Creates a new {@code AsyncAtomicCounter}.
*
* @param name counter name
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Builder for {@link ConsistentTreeMap}.
*/
public abstract class ConsistentTreeMapBuilder<V>
extends DistributedPrimitiveBuilder<ConsistentTreeMapBuilder<V>, ConsistentTreeMap<V>> {
private boolean purgeOnUninstall = false;
public ConsistentTreeMapBuilder() {
super(DistributedPrimitive.Type.CONSISTENT_TREEMAP);
}
/**
* Clears map contents when the owning application is uninstalled.
*
* @return this builder
*/
public ConsistentTreeMapBuilder<V> withPurgeOnUninstall() {
purgeOnUninstall = true;
return this;
}
/**
* Return if map entries need to be cleared when owning application is uninstalled.
*
* @return true if items are to be cleared on uninstall
*/
public boolean purgeOnUninstall() {
return purgeOnUninstall;
}
/**
* Builds the distributed tree map based on the configuration options supplied
* to this builder.
*
* @return new distributed tree map
* @throw java.lang.RuntimeException if a mandatory parameter is missing
*/
public abstract AsyncConsistentTreeMap<V> buildTreeMap();
}
......@@ -52,6 +52,11 @@ public interface DistributedPrimitive {
SET,
/**
* Tree map.
*/
CONSISTENT_TREEMAP,
/**
* atomic counter.
*/
COUNTER,
......
......@@ -44,6 +44,14 @@ public interface StorageService {
<K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
/**
* Creates a new {@code AsyncConsistentTreeMapBuilder}.
*
* @param <V> value type
* @return builder for a async consistent tree map
*/
<V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder();
/**
* Creates a new DistributedSetBuilder.
*
* @param <E> set element type
......
......@@ -63,4 +63,8 @@ public class StorageServiceAdapter implements StorageService {
public <T> Topic<T> getTopic(String name, Serializer serializer) {
return null;
}
public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
return null;
}
}
......
......@@ -31,6 +31,7 @@ import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
......@@ -96,6 +97,7 @@ public final class CatalystSerializers {
serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.resolve(new AtomixConsistentTreeMapCommands.TypeResolver());
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
.registerClassLoader(AtomixLeaderElectorFactory.class)
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.ConsistentTreeMap;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
/**
* Default {@link org.onosproject.store.service.AsyncConsistentTreeMap} builder.
*
* @param <V> type for map value
*/
public class DefaultConsistentTreeMapBuilder<V> extends ConsistentTreeMapBuilder<V> {
private final DistributedPrimitiveCreator primitiveCreator;
public DefaultConsistentTreeMapBuilder(DistributedPrimitiveCreator primitiveCreator) {
this.primitiveCreator = primitiveCreator;
}
@Override
public AsyncConsistentTreeMap<V> buildTreeMap() {
return primitiveCreator.newAsyncConsistentTreeMap(name(), serializer());
}
@Override
public ConsistentTreeMap<V> build() {
return buildTreeMap().asTreeMap();
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A {@link AsyncConsistentTreeMap} that delegates control to another instance
* of {@link AsyncConsistentTreeMap}.
*/
public class DelegatingAsyncConsistentTreeMap<V>
implements AsyncConsistentTreeMap<V> {
private final AsyncConsistentTreeMap<V> delegateMap;
DelegatingAsyncConsistentTreeMap(AsyncConsistentTreeMap<V> delegateMap) {
this.delegateMap = checkNotNull(delegateMap,
"delegate map cannot be null");
}
@Override
public CompletableFuture<String> firstKey() {
return delegateMap.firstKey();
}
@Override
public CompletableFuture<String> lastKey() {
return delegateMap.lastKey();
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> ceilingEntry(String key) {
return delegateMap.ceilingEntry(key);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> floorEntry(String key) {
return delegateMap.floorEntry(key);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> higherEntry(String key) {
return delegateMap.higherEntry(key);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> lowerEntry(String key) {
return delegateMap.lowerEntry(key);
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> firstEntry() {
return delegateMap.firstEntry();
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> lastEntry() {
return delegateMap.lastEntry();
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> pollFirstEntry() {
return delegateMap.pollFirstEntry();
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V>>> pollLastEntry() {
return delegateMap.pollLastEntry();
}
@Override
public CompletableFuture<String> lowerKey(String key) {
return delegateMap.lowerKey(key);
}
@Override
public CompletableFuture<String> floorKey(String key) {
return delegateMap.floorKey(key);
}
@Override
public CompletableFuture<String> ceilingKey(String key) {
return delegateMap.ceilingKey(key);
}
@Override
public CompletableFuture<String> higherKey(String key) {
return delegateMap.higherKey(key);
}
@Override
public CompletableFuture<NavigableSet<String>> navigableKeySet() {
return delegateMap.navigableKeySet();
}
@Override
public CompletableFuture<NavigableMap<String, V>> subMap(
String upperKey,
String lowerKey,
boolean inclusiveUpper,
boolean inclusiveLower) {
return delegateMap.subMap(upperKey, lowerKey,
inclusiveUpper, inclusiveLower);
}
@Override
public String name() {
return delegateMap.name();
}
@Override
public CompletableFuture<Integer> size() {
return delegateMap.size();
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
return delegateMap.containsKey(key);
}
@Override
public CompletableFuture<Boolean> containsValue(V value) {
return delegateMap.containsValue(value);
}
@Override
public CompletableFuture<Versioned<V>> get(String key) {
return delegateMap.get(key);
}
@Override
public CompletableFuture<Versioned<V>> computeIf(
String key,
Predicate<? super V> condition,
BiFunction<? super String, ? super V,
? extends V> remappingFunction) {
return delegateMap.computeIf(key, condition, remappingFunction);
}
@Override
public CompletableFuture<Versioned<V>> put(String key, V value) {
return delegateMap.put(key, value);
}
@Override
public CompletableFuture<Versioned<V>> putAndGet(String key, V value) {
return delegateMap.putAndGet(key, value);
}
@Override
public CompletableFuture<Versioned<V>> remove(String key) {
return delegateMap.remove(key);
}
@Override
public CompletableFuture<Void> clear() {
return delegateMap.clear();
}
@Override
public CompletableFuture<Set<String>> keySet() {
return delegateMap.keySet();
}
@Override
public CompletableFuture<Collection<Versioned<V>>> values() {
return delegateMap.values();
}
@Override
public CompletableFuture<Set<Map.Entry<String, Versioned<V>>>> entrySet() {
return delegateMap.entrySet();
}
@Override
public CompletableFuture<Versioned<V>> putIfAbsent(String key, V value) {
return delegateMap.putIfAbsent(key, value);
}
@Override
public CompletableFuture<Boolean> remove(String key, V value) {
return delegateMap.remove(key, value);
}
@Override
public CompletableFuture<Boolean> remove(String key, long version) {
return delegateMap.remove(key, version);
}
@Override
public CompletableFuture<Versioned<V>> replace(String key, V value) {
return delegateMap.replace(key, value);
}
@Override
public CompletableFuture<Boolean> replace(String key, V oldValue,
V newValue) {
return delegateMap.replace(key, oldValue, newValue);
}
@Override
public CompletableFuture<Boolean> replace(String key, long oldVersion,
V newValue) {
return delegateMap.replace(key, oldVersion, newValue);
}
@Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
}
@Override
public CompletableFuture<Void> removeListener(
MapEventListener<String, V> listener) {
return delegateMap.removeListener(listener);
}
@Override
public CompletableFuture<Boolean> prepare(
MapTransaction<String, V> transaction) {
return delegateMap.prepare(transaction);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return delegateMap.commit(transactionId);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return delegateMap.rollback(transactionId);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(
MapTransaction<String, V> transaction) {
return delegateMap.prepareAndCommit(transaction);
}
@Override
public boolean equals(Object other) {
if (other instanceof DelegatingAsyncConsistentTreeMap) {
DelegatingAsyncConsistentTreeMap<V> that =
(DelegatingAsyncConsistentTreeMap) other;
return this.delegateMap.equals(that.delegateMap);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(delegateMap);
}
}
......@@ -18,6 +18,7 @@ package org.onosproject.store.primitives.impl;
import java.util.function.Function;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
/**
......@@ -100,4 +101,24 @@ public final class DistributedPrimitives {
valueEncoder,
valueDecoder);
}
/**
* Creates an instance of {@code DistributedTreeMap} that transforms operations inputs and applies them
* to corresponding operation in a different typed map and returns the output after reverse transforming it.
*
* @param map backing map
* @param valueEncoder transformer for value type of returned map to value type of input map
* @param valueDecoder transformer for value type of input map to value type of returned map
* @param <V1> returned map value type
* @param <V2> input map key type
* @return new map
*/
public static <V1, V2> AsyncConsistentTreeMap<V1> newTranscodingTreeMap(
AsyncConsistentTreeMap<V2> map,
Function<V1, V2> valueEncoder,
Function<V2, V1> valueDecoder) {
return new TranscodingAsyncConsistentTreeMap<>(map,
valueEncoder,
valueDecoder);
}
}
......
......@@ -27,6 +27,7 @@ import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.Serializer;
......@@ -68,6 +69,12 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv
}
@Override
public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name,
Serializer serializer) {
return getCreator(name).newAsyncConsistentTreeMap(name, serializer);
}
@Override
public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer));
}
......
......@@ -47,6 +47,7 @@ import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
......@@ -131,6 +132,12 @@ public class StorageManager implements StorageService, StorageAdminService {
}
@Override
public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
return new DefaultConsistentTreeMapBuilder<V>(
federatedPrimitiveCreator);
}
@Override
public <E> DistributedSetBuilder<E> setBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultDistributedSetBuilder<>(() -> this.<E, Boolean>consistentMapBuilder());
......
......@@ -39,6 +39,7 @@ import java.util.function.Function;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
......@@ -46,6 +47,7 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
......@@ -143,6 +145,30 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
}
@Override
public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
AtomixConsistentTreeMap atomixConsistentTreeMap =
client.getResource(name, AtomixConsistentTreeMap.class).join();
Consumer<State> statusListener = state -> {
atomixConsistentTreeMap.statusChangeListeners()
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
AsyncConsistentTreeMap<byte[]> rawMap =
new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
@Override
public String name() {
return name();
}
};
AsyncConsistentTreeMap<V> transcodedMap =
DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
rawMap,
value -> value == null ? null : serializer.encode(value),
bytes -> serializer.decode(bytes));
return transcodedMap;
}
@Override
public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) {
return DistributedPrimitives.newSetFromMap(this.<E, Boolean>newAsyncConsistentMap(name, serializer));
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import com.google.common.collect.Maps;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Created by admin on 8/3/16.
*/
public class TranscodingAsyncConsistentTreeMap<V1, V2>
implements AsyncConsistentTreeMap<V1> {
private final AsyncConsistentTreeMap<V2> backingMap;
private final Function<V2, V1> valueDecoder;
private final Function<V1, V2> valueEncoder;
private final Function<Versioned<V2>, Versioned<V1>>
versionedValueTransform;
private final Map<MapEventListener<String, V1>,
TranscodingAsyncConsistentTreeMap.InternalBackingMapEventListener>
listeners = Maps.newIdentityHashMap();
public TranscodingAsyncConsistentTreeMap(
AsyncConsistentTreeMap<V2> backingMap,
Function<V1, V2> valueEncoder,
Function<V2, V1> valueDecoder) {
this.backingMap = backingMap;
this.valueEncoder = v -> v == null ? null : valueEncoder.apply(v);
this.valueDecoder = v -> v == null ? null : valueDecoder.apply(v);
this.versionedValueTransform = v -> v == null ? null :
v.map(valueDecoder);
}
@Override
public CompletableFuture<String> firstKey() {
return backingMap.firstKey();
}
@Override
public CompletableFuture<String> lastKey() {
return backingMap.lastKey();
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
ceilingEntry(String key) {
return backingMap.ceilingEntry(key)
.thenApply(
entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
floorEntry(String key) {
return backingMap.floorEntry(key)
.thenApply(
entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
higherEntry(String key) {
return backingMap
.higherEntry(key)
.thenApply(entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
lowerEntry(String key) {
return backingMap.lowerEntry(key).thenApply(
entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
firstEntry() {
return backingMap.firstEntry()
.thenApply(entry ->
Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
lastEntry() {
return backingMap.lastEntry()
.thenApply(
entry -> Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
pollFirstEntry() {
return backingMap.pollFirstEntry()
.thenApply(
entry -> Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<Map.Entry<String, Versioned<V1>>>
pollLastEntry() {
return backingMap.pollLastEntry()
.thenApply(entry -> Maps.immutableEntry(
entry.getKey(),
entry.getValue().map(valueDecoder)));
}
@Override
public CompletableFuture<String> lowerKey(String key) {
return backingMap.lowerKey(key);
}
@Override
public CompletableFuture<String> floorKey(String key) {
return backingMap.floorKey(key);
}
@Override
public CompletableFuture<String> ceilingKey(String key) {
return backingMap.ceilingKey(key);
}
@Override
public CompletableFuture<String> higherKey(String key) {
return backingMap.higherKey(key);
}
@Override
public CompletableFuture<NavigableSet<String>> navigableKeySet() {
return backingMap.navigableKeySet();
}
@Override
public CompletableFuture<NavigableMap<String, V1>> subMap(
String upperKey,
String lowerKey,
boolean inclusiveUpper,
boolean inclusiveLower) {
throw new UnsupportedOperationException("This operation is not yet" +
"supported.");
}
@Override
public String name() {
return backingMap.name();
}
@Override
public CompletableFuture<Integer> size() {
return backingMap.size();
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
return backingMap.containsKey(key);
}
@Override
public CompletableFuture<Boolean> containsValue(V1 value) {
return backingMap.containsValue(valueEncoder.apply(value));
}
@Override
public CompletableFuture<Versioned<V1>> get(String key) {
return backingMap.get(key).thenApply(value -> value.map(valueDecoder));
}
@Override
public CompletableFuture<Versioned<V1>> computeIf(
String key, Predicate<? super V1> condition,
BiFunction<? super String, ? super V1, ? extends V1>
remappingFunction) {
try {
return backingMap
.computeIf(
key,
v -> condition.test(valueDecoder.apply(v)),
(k, v) -> valueEncoder
.apply(
remappingFunction.apply(
key,
valueDecoder.apply(v))))
.thenApply(versionedValueTransform);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override
public CompletableFuture<Versioned<V1>> put(String key, V1 value) {
return backingMap.put(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
}
@Override
public CompletableFuture<Versioned<V1>> putAndGet(String key, V1 value) {
return backingMap.putAndGet(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
}
@Override
public CompletableFuture<Versioned<V1>> remove(String key) {
return backingMap.remove(key).thenApply(v -> v.map(valueDecoder));
}
@Override
public CompletableFuture<Void> clear() {
return backingMap.clear();
}
@Override
public CompletableFuture<Set<String>> keySet() {
return backingMap.keySet();
}
@Override
public CompletableFuture<Collection<Versioned<V1>>> values() {
return backingMap.values().thenApply(valueSet -> valueSet.stream()
.map(v -> v.map(valueDecoder)).collect(Collectors.toSet()));
}
@Override
public CompletableFuture<Set<Map.Entry<String, Versioned<V1>>>>
entrySet() {
return backingMap.entrySet()
.thenApply(
entries -> entries
.stream()
.map(entry ->
Maps.immutableEntry(entry.getKey(),
entry.getValue()
.map(valueDecoder)))
.collect(Collectors.toSet()));
}
@Override
public CompletableFuture<Versioned<V1>> putIfAbsent(String key, V1 value) {
return backingMap.putIfAbsent(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
}
@Override
public CompletableFuture<Boolean> remove(String key, V1 value) {
return backingMap.remove(key, valueEncoder.apply(value));
}
@Override
public CompletableFuture<Boolean> remove(String key, long version) {
return backingMap.remove(key, version);
}
@Override
public CompletableFuture<Versioned<V1>> replace(String key, V1 value) {
return backingMap.replace(key, valueEncoder.apply(value))
.thenApply(v -> v.map(valueDecoder));
}
@Override
public CompletableFuture<Boolean> replace(String key, V1 oldValue,
V1 newValue) {
return backingMap.replace(key, valueEncoder.apply(oldValue),
valueEncoder.apply(newValue));
}
@Override
public CompletableFuture<Boolean> replace(String key, long oldVersion,
V1 newValue) {
return backingMap.replace(key, oldVersion,
valueEncoder.apply(newValue));
}
@Override
public CompletableFuture<Void> addListener(
MapEventListener<String, V1> listener,
Executor executor) {
InternalBackingMapEventListener backingMapEventListener =
listeners.computeIfAbsent(
listener,
k -> new InternalBackingMapEventListener(listener));
return backingMap.addListener(backingMapEventListener, executor);
}
@Override
public CompletableFuture<Void> removeListener(
MapEventListener<String, V1> listener) {
InternalBackingMapEventListener backingMapEventListener =
listeners.remove(listener);
if (backingMapEventListener == null) {
return CompletableFuture.completedFuture(null);
} else {
return backingMap.removeListener(backingMapEventListener);
}
}
@Override
public CompletableFuture<Boolean> prepare(
MapTransaction<String, V1> transaction) {
throw new UnsupportedOperationException("This operation is not yet " +
"supported.");
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
throw new UnsupportedOperationException("This operation is not yet " +
"supported."); }
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
throw new UnsupportedOperationException("This operation is not yet " +
"supported."); }
@Override
public CompletableFuture<Boolean> prepareAndCommit(
MapTransaction<String, V1> transaction) {
throw new UnsupportedOperationException("This operation is not yet " +
"supported."); }
private class InternalBackingMapEventListener
implements MapEventListener<String, V2> {
private final MapEventListener<String, V1> listener;
InternalBackingMapEventListener(
MapEventListener<String, V1> listener) {
this.listener = listener;
}
@Override
public void event(MapEvent<String, V2> event) {
listener.event(new MapEvent<String, V1>(
event.name(),
event.key(),
event.newValue() != null ?
event.newValue().map(valueDecoder) : null,
event.oldValue() != null ?
event.oldValue().map(valueDecoder) : null));
}
}
}