Aaron Kruglikov
Committed by Madan Jampani

Updating multimap API and commands and providing implementation.

Change-Id: Iff49b429cfc7c0142f3ab2e1dde1a32e85f20e87
(cherry picked from commit 44a1fef9)
......@@ -16,7 +16,6 @@
package org.onosproject.store.service;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import java.util.Collection;
......@@ -92,7 +91,7 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
* and others ignoring put requests for existing entries.
* @param key the key to add
* @param value the value to add
* @return a future whose value will betrue if the map has changed because
* @return a future whose value will be true if the map has changed because
* of this call, false otherwise
*/
CompletableFuture<Boolean> put(K key, V value);
......@@ -119,16 +118,18 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
* @return a future whose value will be true if the map changes because of
* this call, false otherwise.
*/
CompletableFuture<Boolean> removeAll(K key, Iterable<? extends V> values);
CompletableFuture<Boolean> removeAll(K key,
Collection<? extends V> values);
/**
* Removes all values associated with the specified key as well as the key
* itself.
* @param key the key whose key-value pairs will be removed
* @return a future whose value is the set of values that were removed,
* which may be empty
* which may be empty, if the values did not exist the version will be
* less than one.
*/
CompletableFuture<Versioned<Collection<byte[]>>> removeAll(K key);
CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key);
/**
* Adds the set of key-value pairs of the specified key with each of the
......@@ -140,17 +141,8 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
* @return a future whose value will be true if any change in the map
* results from this call, false otherwise
*/
CompletableFuture<Boolean> putAll(K key, Iterable<? extends V> values);
/**
* Adds all entries from this multimap that are not already present, and
* may or may not add duplicate entries depending on the implementation.
* @param multiMap the map whose entries should be added
* @return a future whose value will be true if any change results from
* this call, false otherwise
*/
CompletableFuture<Boolean> putAll(
Multimap<? extends K, ? extends V> multiMap);
CompletableFuture<Boolean> putAll(K key,
Collection<? extends V> values);
/**
* Stores all the values in values associated with the key specified,
......@@ -161,7 +153,8 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
* @return a future whose value will be the collection of removed values,
* which may be empty
*/
CompletableFuture<Collection<V>> replaceValues(K key, Iterable<V> values);
CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(
K key, Collection<V> values);
/**
* Removes all key-value pairs, after which it will be empty.
......@@ -177,7 +170,7 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
* @return a future whose value will be the collection of the values
* associated with the specified key, the collection may be empty
*/
CompletableFuture<Collection<V>> get(K key);
CompletableFuture<Versioned<Collection<? extends V>>> get(K key);
/**
* Returns a set of the keys contained in this multimap with one or more
......@@ -203,7 +196,7 @@ public interface AsyncConsistentMultimap<K, V> extends DistributedPrimitive {
* @return a future whose value will be a collection of values, this may be
* empty
*/
CompletableFuture<Collection<V>> values();
CompletableFuture<Multiset<V>> values();
/**
* Returns a collection of each key-value pair in this map.
......
......@@ -25,6 +25,7 @@ COMPILE_DEPS = [
TEST_DEPS = [
'//lib:TEST',
'//core/api:onos-api-tests',
'//lib:onos-atomix',
]
osgi_jar_with_tests (
......
......@@ -28,9 +28,9 @@ import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.Command;
import io.atomix.copycat.Query;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -123,7 +123,8 @@ public final class AsyncConsistentMultimapCommands {
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
public void writeObject(BufferOutput<?> buffer,
Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
}
......@@ -166,7 +167,8 @@ public final class AsyncConsistentMultimapCommands {
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
public void writeObject(BufferOutput<?> buffer,
Serializer serializer) {
super.writeObject(buffer, serializer);
}
......@@ -265,49 +267,221 @@ public final class AsyncConsistentMultimapCommands {
}
/**
* Update and get command. Note that corresponding values must have the
* same index in the respective arrays.
* Remove command, backs remove and removeAll's that return booleans.
*/
@SuppressWarnings("serial")
public static class UpdateAndGet extends
MultimapCommand<MapEntryUpdateResult<String, Collection<byte[]>>> {
public static class RemoveAll extends
MultimapCommand<Versioned<Collection<? extends byte[]>>> {
private String key;
private List<byte[]> values;
private List<Match<byte[]>> valueMatches;
private List<Match<Long>> versionMatches;
private Match<Long> versionMatch;
public UpdateAndGet() {
public RemoveAll() {
}
public UpdateAndGet(String key, List<byte[]> values,
List<Match<byte[]>> valueMatches,
List<Match<Long>> versionMatches) {
this.key = key;
this.values = values;
this.valueMatches = valueMatches;
this.versionMatches = versionMatches;
public RemoveAll(String key, Match<Long> versionMatch) {
this.key = Assert.notNull(key, "key");
this.versionMatch = versionMatch;
}
public String key() {
return this.key;
}
public Match<Long> versionMatch() {
return versionMatch;
}
@Override
public CompactionMode compaction() {
return CompactionMode.FULL;
}
@Override
public void writeObject(BufferOutput<?> buffer,
Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
serializer.writeObject(versionMatch, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
key = serializer.readObject(buffer);
versionMatch = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("versionMatch", versionMatch)
.toString();
}
}
/**
* Remove command, backs remove and removeAll's that return booleans.
*/
@SuppressWarnings("serial")
public static class MultiRemove extends
MultimapCommand<Boolean> {
private String key;
private Collection<byte[]> values;
private Match<Long> versionMatch;
public MultiRemove() {
}
public MultiRemove(String key, Collection<byte[]> valueMatches,
Match<Long> versionMatch) {
this.key = Assert.notNull(key, "key");
this.values = valueMatches;
this.versionMatch = versionMatch;
}
public String key() {
return this.key;
}
public List<byte[]> values() {
public Collection<byte[]> values() {
return values;
}
public Match<Long> versionMatch() {
return versionMatch;
}
@Override
public CompactionMode compaction() {
return CompactionMode.FULL;
}
@Override
public void writeObject(BufferOutput<?> buffer,
Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
serializer.writeObject(values, buffer);
serializer.writeObject(versionMatch, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
key = serializer.readObject(buffer);
values = serializer.readObject(buffer);
versionMatch = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("values", values)
.add("versionMatch", versionMatch)
.toString();
}
}
/**
* Command to back the put and putAll methods.
*/
@SuppressWarnings("serial")
public static class Put extends MultimapCommand<Boolean> {
private String key;
private Collection<? extends byte[]> values;
private Match<Long> versionMatch;
public Put() {
}
public Put(String key, Collection<? extends byte[]> values,
Match<Long> versionMatch) {
this.key = Assert.notNull(key, "key");
this.values = values;
this.versionMatch = versionMatch;
}
public String key() {
return key;
}
public Collection<? extends byte[]> values() {
return values;
}
public List<Match<byte[]>> valueMatches() {
return valueMatches;
public Match<Long> versionMatch() {
return versionMatch;
}
@Override
public CompactionMode compaction() {
return CompactionMode.QUORUM;
}
@Override
public void writeObject(BufferOutput<?> buffer,
Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
serializer.writeObject(values, buffer);
serializer.writeObject(versionMatch, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
key = serializer.readObject(buffer);
values = serializer.readObject(buffer);
versionMatch = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("values", values)
.add("versionMatch", versionMatch)
.toString();
}
}
/**
* Replace command, returns the collection that was replaced.
*/
@SuppressWarnings("serial")
public static class Replace extends
MultimapCommand<Versioned<Collection<? extends byte[]>>> {
private String key;
private Collection<byte[]> values;
private Match<Long> versionMatch;
public Replace() {
}
public Replace(String key, Collection<byte[]> values,
Match<Long> versionMatch) {
this.key = Assert.notNull(key, "key");
this.values = values;
this.versionMatch = versionMatch;
}
public String key() {
return this.key;
}
public List<Match<Long>> versionMatches() {
return versionMatches;
public Match<Long> versionMatch() {
return versionMatch;
}
public Collection<byte[]> values() {
return values;
}
@Override
public CompactionMode compaction() {
return values == null ? CompactionMode.FULL :
CompactionMode.QUORUM;
return CompactionMode.FULL;
}
@Override
......@@ -316,8 +490,7 @@ public final class AsyncConsistentMultimapCommands {
super.writeObject(buffer, serializer);
serializer.writeObject(key, buffer);
serializer.writeObject(values, buffer);
serializer.writeObject(valueMatches, buffer);
serializer.writeObject(versionMatches, buffer);
serializer.writeObject(versionMatch, buffer);
}
@Override
......@@ -325,13 +498,16 @@ public final class AsyncConsistentMultimapCommands {
super.readObject(buffer, serializer);
key = serializer.readObject(buffer);
values = serializer.readObject(buffer);
valueMatches = serializer.readObject(buffer);
versionMatches = serializer.readObject(buffer);
versionMatch = serializer.readObject(buffer);
}
@Override
public String toString() {
return super.toString();
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("values", values)
.add("versionMatch", versionMatch)
.toString();
}
}
......@@ -360,7 +536,7 @@ public final class AsyncConsistentMultimapCommands {
* Value collection query.
*/
@SuppressWarnings("serial")
public static class Values extends MultimapQuery<Collection<byte[]>> {
public static class Values extends MultimapQuery<Multiset<byte[]>> {
}
/**
......@@ -374,7 +550,11 @@ public final class AsyncConsistentMultimapCommands {
/**
* Get value query.
*/
public static class Get extends KeyQuery<Collection<byte[]>> {
public static class Get extends
KeyQuery<Versioned<Collection<? extends byte[]>>> {
public Get(String key) {
super(key);
}
}
/**
......@@ -387,7 +567,7 @@ public final class AsyncConsistentMultimapCommands {
registry.register(ContainsKey.class, -1000);
registry.register(ContainsValue.class, -1001);
registry.register(ContainsEntry.class, -1002);
registry.register(UpdateAndGet.class, -1003);
registry.register(Replace.class, -1003);
registry.register(Clear.class, -1004);
registry.register(KeySet.class, -1005);
registry.register(Keys.class, -1006);
......@@ -396,6 +576,9 @@ public final class AsyncConsistentMultimapCommands {
registry.register(Size.class, -1009);
registry.register(IsEmpty.class, -1010);
registry.register(Get.class, -1011);
registry.register(Put.class, -1012);
registry.register(RemoveAll.class, -1013);
registry.register(MultiRemove.class, -1014);
}
}
}
......
......@@ -17,11 +17,10 @@
package org.onosproject.store.primitives.resources.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import org.onlab.util.Match;
import io.atomix.resource.ResourceTypeInfo;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.Versioned;
......@@ -32,13 +31,28 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.*;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Clear;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsEntry;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsKey;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsValue;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Entries;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Get;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Keys;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Size;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Values;
/**
* Set based implementation of the {@link AsyncConsistentMultimap}.
* <p>
* Note: this implementation does not allow null entries or duplicate entries.
*/
@ResourceTypeInfo(id = -153, factory = AsyncConsistentSetMultimapFactory.class)
public class AsyncConsistentSetMultimap
extends AbstractResource<AsyncConsistentSetMultimap>
implements AsyncConsistentMultimap<String, byte[]> {
......@@ -81,68 +95,50 @@ public class AsyncConsistentSetMultimap
@Override
public CompletableFuture<Boolean> put(String key, byte[] value) {
return submit(new UpdateAndGet(key, Lists.newArrayList(value),
Lists.newArrayList(Match.NULL),
Lists.newArrayList(Match.NULL)))
.whenComplete((result, e) -> throwIfLocked(result.status()))
.thenApply(result ->
result.status() == MapEntryUpdateResult.Status.OK);
return submit(new Put(key, Lists.newArrayList(value), null));
}
@Override
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return submit(new UpdateAndGet(key, Lists.newArrayList(value),
Lists.newArrayList(Match.ifValue(value)),
Lists.newArrayList(Match.NULL)))
.whenComplete((result, e) -> throwIfLocked(result.status()))
.thenApply(result ->
result.status() == MapEntryUpdateResult.Status.OK);
return submit(new MultiRemove(key,
Lists.newArrayList(value),
null));
}
@Override
public CompletableFuture<Boolean> removeAll(String key, Iterable<? extends byte[]> values) {
throw new UnsupportedOperationException("This operation cannot be " +
"used without support for " +
"transactions.");
}
@Override
public CompletableFuture<Versioned<Collection<byte[]>>> removeAll(String key) {
return submit(new UpdateAndGet(key, null, null, null))
.whenComplete((result, e) -> throwIfLocked(result.status()))
.thenApply(result -> result.oldValue());
public CompletableFuture<Boolean> removeAll(
String key, Collection<? extends byte[]> values) {
return submit(new MultiRemove(key, (Collection<byte[]>) values, null));
}
@Override
public CompletableFuture<Boolean> putAll(String key, Iterable<? extends byte[]> values) {
throw new UnsupportedOperationException("This operation cannot be " +
"used without support for " +
"transactions.");
public CompletableFuture<
Versioned<Collection<? extends byte[]>>> removeAll(String key) {
return submit(new RemoveAll(key, null));
}
@Override
public CompletableFuture<Boolean> putAll(Multimap<? extends String, ? extends byte[]> multiMap) {
throw new UnsupportedOperationException("This operation cannot be " +
"used without support for " +
"transactions.");
public CompletableFuture<Boolean> putAll(
String key, Collection<? extends byte[]> values) {
return submit(new Put(key, values, null));
}
@Override
public CompletableFuture<Collection<byte[]>> replaceValues(String key, Iterable<byte[]> values) {
throw new UnsupportedOperationException("This operation cannot be " +
"used without support for " +
"transactions.");
public CompletableFuture<
Versioned<Collection<? extends byte[]>>> replaceValues(
String key, Collection<byte[]> values) {
return submit(new Replace(key, values, null));
}
@Override
public CompletableFuture<Void> clear() {
return submit(new AsyncConsistentMultimapCommands.Clear());
return submit(new Clear());
}
@Override
public CompletableFuture<Collection<byte[]>> get(String key) {
return submit(new Get());
public CompletableFuture<
Versioned<Collection<? extends byte[]>>> get(String key) {
return submit(new Get(key));
}
@Override
......@@ -156,7 +152,7 @@ public class AsyncConsistentSetMultimap
}
@Override
public CompletableFuture<Collection<byte[]>> values() {
public CompletableFuture<Multiset<byte[]>> values() {
return submit(new Values());
}
......@@ -182,7 +178,9 @@ public class AsyncConsistentSetMultimap
*/
private void throwIfLocked(MapEntryUpdateResult.Status status) {
if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
throw new ConcurrentModificationException("Cannot update map: Another transaction in progress");
throw new ConcurrentModificationException("Cannot update map: " +
"Another transaction " +
"in progress");
}
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory;
import io.atomix.resource.ResourceStateMachine;
import java.util.Properties;
/**
* {@link AsyncConsistentSetMultimap} resource factory.
*/
public class AsyncConsistentSetMultimapFactory implements
ResourceFactory<AsyncConsistentSetMultimap> {
@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new AsyncConsistentMultimapCommands.TypeResolver();
}
@Override
public ResourceStateMachine createStateMachine(Properties config) {
return new AsyncConsistentSetMultimapState(config);
}
@Override
public AsyncConsistentSetMultimap createInstance(CopycatClient client,
Properties properties) {
return new AsyncConsistentSetMultimap(client, properties);
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Clear;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsEntry;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsKey;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.ContainsValue;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Entries;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Get;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.IsEmpty;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.KeySet;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Keys;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.MultimapCommand;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Put;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.RemoveAll;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Replace;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Size;
import static org.onosproject.store.primitives.resources.impl.AsyncConsistentMultimapCommands.Values;
import static org.slf4j.LoggerFactory.getLogger;
/**
* State Machine for {@link AsyncConsistentSetMultimap} resource.
*/
public class AsyncConsistentSetMultimapState extends ResourceStateMachine
implements SessionListener, Snapshottable {
private final Logger log = getLogger(getClass());
private final AtomicLong globalVersion = new AtomicLong(1);
//TODO Add listener map here
private final Map<String, MapEntryValue> backingMap = Maps.newHashMap();
public AsyncConsistentSetMultimapState(Properties properties) {
super(properties);
}
@Override
public void snapshot(SnapshotWriter writer) {
}
@Override
public void install(SnapshotReader reader) {
}
@Override
protected void configure(StateMachineExecutor executor) {
executor.register(Size.class, this::size);
executor.register(IsEmpty.class, this::isEmpty);
executor.register(ContainsKey.class, this::containsKey);
executor.register(ContainsValue.class, this::containsValue);
executor.register(ContainsEntry.class, this::containsEntry);
executor.register(Clear.class, this::clear);
executor.register(KeySet.class, this::keySet);
executor.register(Keys.class, this::keys);
executor.register(Values.class, this::values);
executor.register(Entries.class, this::entries);
executor.register(Get.class, this::get);
executor.register(RemoveAll.class, this::removeAll);
executor.register(MultiRemove.class, this::multiRemove);
executor.register(Put.class, this::put);
executor.register(Replace.class, this::replace);
}
@Override
public void delete() {
super.delete();
}
/**
* Handles a Size commit.
*
* @param commit Size commit
* @return number of unique key value pairs in the multimap
*/
protected int size(Commit<? extends Size> commit) {
try {
return backingMap.values()
.stream()
.map(valueCollection -> valueCollection.values().size())
.collect(Collectors.summingInt(size -> size));
} finally {
commit.close();
}
}
/**
* Handles an IsEmpty commit.
*
* @param commit IsEmpty commit
* @return true if the multimap contains no key-value pairs, else false
*/
protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
try {
return backingMap.isEmpty();
} finally {
commit.close();
}
}
/**
* Handles a contains key commit.
*
* @param commit ContainsKey commit
* @return returns true if the key is in the multimap, else false
*/
protected boolean containsKey(Commit<? extends ContainsKey> commit) {
try {
return backingMap.containsKey(commit.operation().key());
} finally {
commit.close();
}
}
/**
* Handles a ContainsValue commit.
*
* @param commit ContainsValue commit
* @return true if the value is in the multimap, else false
*/
protected boolean containsValue(Commit<? extends ContainsValue> commit) {
try {
Match<byte[]> match = Match.ifValue(commit.operation().value());
return backingMap
.values()
.stream()
.anyMatch(valueList ->
valueList
.values()
.stream()
.anyMatch(byteValue ->
match.matches(byteValue)));
} finally {
commit.close();
}
}
/**
* Handles a ContainsEntry commit.
*
* @param commit ContainsEntry commit
* @return true if the key-value pair exists, else false
*/
protected boolean containsEntry(Commit<? extends ContainsEntry> commit) {
try {
MapEntryValue entryValue =
backingMap.get(commit.operation().key());
if (entryValue == null) {
return false;
} else {
Match valueMatch = Match.ifValue(commit.operation().value());
return entryValue
.values()
.stream()
.anyMatch(byteValue -> valueMatch.matches(byteValue));
}
} finally {
commit.close();
}
}
/**
* Handles a Clear commit.
*
* @param commit Clear commit
*/
protected void clear(Commit<? extends Clear> commit) {
try {
backingMap.clear();
} finally {
commit.close();
}
}
/**
* Handles a KeySet commit.
*
* @param commit KeySet commit
* @return a set of all keys in the multimap
*/
protected Set<String> keySet(Commit<? extends KeySet> commit) {
try {
return backingMap.keySet();
} finally {
commit.close();
}
}
/**
* Handles a Keys commit.
*
* @param commit Keys commit
* @return a multiset of keys with each key included an equal number of
* times to the total key-value pairs in which that key participates
*/
protected Multiset<String> keys(Commit<? extends Keys> commit) {
try {
Multiset keys = HashMultiset.create();
backingMap.forEach((key, mapEntryValue) -> {
keys.add(key, mapEntryValue.values().size());
});
return keys;
} finally {
commit.close();
}
}
/**
* Handles a Values commit.
*
* @param commit Values commit
* @return the set of values in the multimap with duplicates included
*/
protected Multiset<byte[]> values(Commit<? extends Values> commit) {
try {
return backingMap
.values()
.stream()
.collect(new HashMultisetValueCollector());
} finally {
commit.close();
}
}
/**
* Handles an Entries commit.
*
* @param commit Entries commit
* @return a set of all key-value pairs in the multimap
*/
protected Collection<Map.Entry<String, byte[]>> entries(
Commit<? extends Entries> commit) {
try {
return backingMap
.entrySet()
.stream()
.collect(new EntrySetCollector());
} finally {
commit.close();
}
}
/**
* Handles a Get commit.
*
* @param commit Get commit
* @return the collection of values associated with the key or an empty
* list if none exist
*/
protected Versioned<Collection<? extends byte[]>> get(
Commit<? extends Get> commit) {
try {
MapEntryValue mapEntryValue = backingMap.get(commit.operation().key());
return toVersioned(backingMap.get(commit.operation().key()));
} finally {
commit.close();
}
}
/**
* Handles a removeAll commit, and returns the previous mapping.
*
* @param commit removeAll commit
* @return collection of removed values
*/
protected Versioned<Collection<? extends byte[]>> removeAll(
Commit<? extends RemoveAll> commit) {
if (!backingMap.containsKey(commit.operation().key())) {
commit.close();
return new Versioned<>(Sets.newHashSet(), -1);
} else {
return backingMap.get(commit.operation().key()).addCommit(commit);
}
}
/**
* Handles a multiRemove commit, returns true if the remove results in any
* change.
* @param commit multiRemove commit
* @return true if any change results, else false
*/
protected boolean multiRemove(Commit<? extends MultiRemove> commit) {
if (!backingMap.containsKey(commit.operation().key())) {
commit.close();
return false;
} else {
return (backingMap
.get(commit.operation().key())
.addCommit(commit)) != null;
}
}
/**
* Handles a put commit, returns true if any change results from this
* commit.
* @param commit a put commit
* @return true if this commit results in a change, else false
*/
protected boolean put(Commit<? extends Put> commit) {
if (commit.operation().values().isEmpty()) {
return false;
}
if (!backingMap.containsKey(commit.operation().key())) {
backingMap.put(commit.operation().key(),
new NonTransactionalCommit(1));
}
return backingMap
.get(commit.operation().key())
.addCommit(commit) != null;
}
protected Versioned<Collection<? extends byte[]>> replace(
Commit<? extends Replace> commit) {
if (!backingMap.containsKey(commit.operation().key())) {
backingMap.put(commit.operation().key(),
new NonTransactionalCommit(1));
}
return backingMap.get(commit.operation().key()).addCommit(commit);
}
@Override
public void register(ServerSession session) {
super.register(session);
}
@Override
public void unregister(ServerSession session) {
super.unregister(session);
}
@Override
public void expire(ServerSession session) {
super.expire(session);
}
@Override
public void close(ServerSession session) {
super.close(session);
}
private interface MapEntryValue {
/**
* Returns the list of raw {@code byte[]'s}.
*
* @return list of raw values
*/
Collection<? extends byte[]> values();
/**
* Returns the version of the value.
*
* @return version
*/
long version();
/**
* Discards the value by invoke appropriate clean up actions.
*/
void discard();
/**
* Add a new commit and modifies the set of values accordingly.
* In the case of a replace or removeAll it returns the set of removed
* values. In the case of put or multiRemove it returns null for no
* change and a set of the added or removed values respectively if a
* change resulted.
*
* @param commit the commit to be added
*/
Versioned<Collection<? extends byte[]>> addCommit(
Commit<? extends MultimapCommand> commit);
}
private class NonTransactionalCommit implements MapEntryValue {
private long version;
private final TreeMap<byte[], CountDownCompleter<Commit>>
valueCountdownMap = Maps.newTreeMap(new ByteArrayComparator());
/*This is a mapping of commits that added values to the commits
* removing those values, they will not be circular because keys will
* be exclusively Put and Replace commits and values will be exclusively
* Multiremove commits, each time a Put or replace is removed it should
* as part of closing go through and countdown each of the remove
* commits depending on it.*/
private final HashMultimap<Commit, CountDownCompleter<Commit>>
additiveToRemovalCommits = HashMultimap.create();
public NonTransactionalCommit(
long version) {
//Set the version to current it will only be updated once this is
// populated
this.version = globalVersion.get();
}
@Override
public Collection<? extends byte[]> values() {
return valueCountdownMap.keySet();
}
@Override
public long version() {
return version;
}
@Override
public void discard() {
valueCountdownMap.values().forEach(completer ->
completer.object().close());
}
@Override
public Versioned<Collection<? extends byte[]>> addCommit(
Commit<? extends MultimapCommand> commit) {
Preconditions.checkNotNull(commit);
Preconditions.checkNotNull(commit.operation());
Versioned<Collection<? extends byte[]>> retVersion;
if (commit.operation() instanceof Put) {
//Using a treeset here sanitizes the input, removing duplicates
Set<byte[]> valuesToAdd =
Sets.newTreeSet(new ByteArrayComparator());
((Put) commit.operation()).values().forEach(value -> {
if (!valueCountdownMap.containsKey(value)) {
valuesToAdd.add(value);
}
});
if (valuesToAdd.isEmpty()) {
//Do not increment or add the commit if no change resulted
// TODO fairly sure the below case is unreachable but
// TODO need to make sure
// if (valueCountdownMap.isEmpty()) {
// backingMap.remove(((Put) commit.operation()).key());
// }
commit.close();
return null;
}
//When all values from a commit have been removed decrement all
//removal commits relying on it and remove itself from the
//mapping of additive commits to the commits removing the
//values it added. (Only multiremoves will be dependent)
CountDownCompleter<Commit> completer =
new CountDownCompleter<>(commit, valuesToAdd.size(),
c -> {
if (additiveToRemovalCommits.containsKey(c)) {
additiveToRemovalCommits.
get(c).
forEach(countdown ->
countdown.countDown());
additiveToRemovalCommits.removeAll(c);
}
c.close();
});
retVersion = new Versioned<>(valuesToAdd, version);
valuesToAdd.forEach(value -> valueCountdownMap.put(value,
completer));
version++;
return retVersion;
} else if (commit.operation() instanceof Replace) {
//Will this work?? Need to check before check-in!
Set<byte[]> removedValues = Sets.newHashSet();
removedValues.addAll(valueCountdownMap.keySet());
retVersion = new Versioned<>(removedValues, version);
valueCountdownMap.values().forEach(countdown ->
countdown.countDown());
valueCountdownMap.clear();
Set<byte[]> valuesToAdd =
Sets.newTreeSet(new ByteArrayComparator());
((Replace) commit.operation()).values().forEach(value -> {
valuesToAdd.add(value);
});
if (valuesToAdd.isEmpty()) {
version = globalVersion.incrementAndGet();
backingMap.remove(((Replace) commit.operation()).key());
//Order is important here, the commit must be closed last
//(or minimally after all uses)
commit.close();
return retVersion;
}
CountDownCompleter<Commit> completer =
new CountDownCompleter<>(commit, valuesToAdd.size(),
c -> {
if (additiveToRemovalCommits
.containsKey(c)) {
additiveToRemovalCommits.
get(c).
forEach(countdown ->
countdown.countDown());
additiveToRemovalCommits.
removeAll(c);
}
c.close();
});
valuesToAdd.forEach(value ->
valueCountdownMap.put(value, completer));
version = globalVersion.incrementAndGet();
return retVersion;
} else if (commit.operation() instanceof RemoveAll) {
Set<byte[]> removed = Sets.newHashSet();
//We can assume here that values only appear once and so we
//do not need to sanitize the return for duplicates.
removed.addAll(valueCountdownMap.keySet());
retVersion = new Versioned<>(removed, version);
valueCountdownMap.values().forEach(countdown ->
countdown.countDown());
valueCountdownMap.clear();
//In the case of a removeAll all commits will be removed and
//unlike the multiRemove case we do not need to consider
//dependencies among additive and removal commits.
//Save the key for use after the commit is closed
String key = ((RemoveAll) commit.operation()).key();
commit.close();
version = globalVersion.incrementAndGet();
backingMap.remove(key);
return retVersion;
} else if (commit.operation() instanceof MultiRemove) {
//Must first calculate how many commits the removal depends on.
//At this time we also sanitize the removal set by adding to a
//set with proper handling of byte[] equality.
Set<byte[]> removed = Sets.newHashSet();
Set<Commit> commitsRemovedFrom = Sets.newHashSet();
((MultiRemove) commit.operation()).values().forEach(value -> {
if (valueCountdownMap.containsKey(value)) {
removed.add(value);
commitsRemovedFrom
.add(valueCountdownMap.get(value).object());
}
});
//If there is nothing to be removed no action should be taken.
if (removed.isEmpty()) {
//Do not increment or add the commit if no change resulted
commit.close();
return null;
}
//When all additive commits this depends on are closed this can
//be closed as well.
CountDownCompleter<Commit> completer =
new CountDownCompleter<>(commit,
commitsRemovedFrom.size(),
c -> c.close());
commitsRemovedFrom.forEach(commitRemovedFrom -> {
additiveToRemovalCommits.put(commitRemovedFrom, completer);
});
//Save key in case countdown results in closing the commit.
String removedKey = ((MultiRemove) commit.operation()).key();
removed.forEach(removedValue -> {
valueCountdownMap.remove(removedValue).countDown();
});
//The version is updated locally as well as globally even if
//this object will be removed from the map in case any other
//party still holds a reference to this object.
retVersion = new Versioned<>(removed, version);
version = globalVersion.incrementAndGet();
if (valueCountdownMap.isEmpty()) {
backingMap
.remove(removedKey);
}
return retVersion;
} else {
throw new IllegalArgumentException();
}
}
}
/**
* A collector that creates MapEntryValues and creates a multiset of all
* values in the map an equal number of times to the number of sets in
* which they participate.
*/
private class HashMultisetValueCollector implements
Collector<MapEntryValue,
HashMultiset<byte[]>,
HashMultiset<byte[]>> {
private HashMultiset<byte[]> multiset = null;
@Override
public Supplier<HashMultiset<byte[]>> supplier() {
return new Supplier<HashMultiset<byte[]>>() {
@Override
public HashMultiset<byte[]> get() {
if (multiset == null) {
multiset = HashMultiset.create();
}
return multiset;
}
};
}
@Override
public BiConsumer<HashMultiset<byte[]>, MapEntryValue> accumulator() {
return (multiset, mapEntryValue) ->
multiset.addAll(mapEntryValue.values());
}
@Override
public BinaryOperator<HashMultiset<byte[]>> combiner() {
return (setOne, setTwo) -> {
setOne.addAll(setTwo);
return setOne;
};
}
@Override
public Function<HashMultiset<byte[]>,
HashMultiset<byte[]>> finisher() {
return (unused) -> multiset;
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.UNORDERED);
}
}
/**
* A collector that creates Entries of {@code <String, MapEntryValue>} and
* creates a set of entries all key value pairs in the map.
*/
private class EntrySetCollector implements
Collector<Map.Entry<String, MapEntryValue>,
Set<Map.Entry<String, byte[]>>,
Set<Map.Entry<String, byte[]>>> {
private Set<Map.Entry<String, byte[]>> set = null;
@Override
public Supplier<Set<Map.Entry<String, byte[]>>> supplier() {
return new Supplier<Set<Map.Entry<String, byte[]>>>() {
@Override
public Set<Map.Entry<String, byte[]>> get() {
if (set == null) {
set = Sets.newHashSet();
}
return set;
}
};
}
@Override
public BiConsumer<Set<Map.Entry<String, byte[]>>,
Map.Entry<String, MapEntryValue>> accumulator() {
return (set, entry) -> {
entry
.getValue()
.values()
.forEach(byteValue ->
set.add(Maps.immutableEntry(entry.getKey(),
byteValue)));
};
}
@Override
public BinaryOperator<Set<Map.Entry<String, byte[]>>> combiner() {
return (setOne, setTwo) -> {
setOne.addAll(setTwo);
return setOne;
};
}
@Override
public Function<Set<Map.Entry<String, byte[]>>,
Set<Map.Entry<String, byte[]>>> finisher() {
return (unused) -> set;
}
@Override
public Set<Characteristics> characteristics() {
return EnumSet.of(Characteristics.UNORDERED);
}
}
/**
* Utility for turning a {@code MapEntryValue} to {@code Versioned}.
* @param value map entry value
* @return versioned instance or an empty list versioned -1 if argument is
* null
*/
private Versioned<Collection<? extends byte[]>> toVersioned(
MapEntryValue value) {
return value == null ? new Versioned<>(Lists.newArrayList(), -1) :
new Versioned<>(value.values(),
value.version());
}
private class ByteArrayComparator implements Comparator<byte[]> {
@Override
public int compare(byte[] o1, byte[] o2) {
if (Arrays.equals(o1, o2)) {
return 0;
} else {
for (int i = 0; i < o1.length && i < o2.length; i++) {
if (o1[i] < o2[i]) {
return -1;
} else if (o1[i] > o2[i]) {
return 1;
}
}
return o1.length > o2.length ? 1 : -1;
}
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import com.google.common.io.Files;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.LocalTransport;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceType;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests the {@link AsyncConsistentSetMultimap}.
*/
public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
private final File testDir = Files.createTempDir();
private final String keyOne = "hello";
private final String keyTwo = "goodbye";
private final String keyThree = "foo";
private final String keyFour = "bar";
private final byte[] valueOne = Tools.getBytesUtf8(keyOne);
private final byte[] valueTwo = Tools.getBytesUtf8(keyTwo);
private final byte[] valueThree = Tools.getBytesUtf8(keyThree);
private final byte[] valueFour = Tools.getBytesUtf8(keyFour);
private final List<String> allKeys = Lists.newArrayList(keyOne, keyTwo,
keyThree, keyFour);
private final List<byte[]> allValues = Lists.newArrayList(valueOne,
valueTwo,
valueThree,
valueFour);
@Override
protected ResourceType resourceType() {
return new ResourceType(AsyncConsistentSetMultimap.class);
}
/**
* Test that size behaves correctly (This includes testing of the empty
* check).
*/
@Ignore
@Test
public void testSize() throws Throwable {
clearTests();
AsyncConsistentSetMultimap map = createResource(3);
//Simplest operation case
map.isEmpty().thenAccept(result -> assertTrue(result));
map.put(keyOne, valueOne).
thenAccept(result -> assertTrue(result)).join();
map.isEmpty().thenAccept(result -> assertFalse(result));
map.size().thenAccept(result -> assertEquals(1, (int) result))
.join();
//Make sure sizing is dependent on values not keys
map.put(keyOne, valueTwo).
thenAccept(result -> assertTrue(result)).join();
map.size().thenAccept(result -> assertEquals(2, (int) result))
.join();
//Ensure that double adding has no effect
map.put(keyOne, valueOne).
thenAccept(result -> assertFalse(result)).join();
map.size().thenAccept(result -> assertEquals(2, (int) result))
.join();
//Check handling for multiple keys
map.put(keyTwo, valueOne)
.thenAccept(result -> assertTrue(result)).join();
map.put(keyTwo, valueTwo)
.thenAccept(result -> assertTrue(result)).join();
map.size().thenAccept(result -> assertEquals(4, (int) result))
.join();
//Check size with removal
map.remove(keyOne, valueOne).
thenAccept(result -> assertTrue(result)).join();
map.size().thenAccept(result -> assertEquals(3, (int) result))
.join();
//Check behavior under remove of non-existant key
map.remove(keyOne, valueOne).
thenAccept(result -> assertFalse(result)).join();
map.size().thenAccept(result -> assertEquals(3, (int) result))
.join();
//Check clearing the entirety of the map
map.clear().join();
map.size().thenAccept(result -> assertEquals(0, (int) result))
.join();
map.isEmpty().thenAccept(result -> assertTrue(result));
map.destroy().join();
clearTests();
}
/**
* Contains tests for value, key and entry.
*/
@Ignore
@Test
public void containsTest() throws Throwable {
clearTests();
AsyncConsistentSetMultimap map = createResource(3);
//Populate the maps
allKeys.forEach(key -> {
map.putAll(key, allValues)
.thenAccept(result -> assertTrue(result)).join();
});
map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
//Test key contains positive results
allKeys.forEach(key -> {
map.containsKey(key)
.thenAccept(result -> assertTrue(result)).join();
});
//Test value contains positive results
allValues.forEach(value -> {
map.containsValue(value)
.thenAccept(result -> assertTrue(result)).join();
});
//Test contains entry for all possible entries
allKeys.forEach(key -> {
allValues.forEach(value -> {
map.containsEntry(key, value)
.thenAccept(result -> assertTrue(result)).join();
});
});
//Test behavior after removals
allValues.forEach(value -> {
final String[] removedKey = new String[1];
allKeys.forEach(key -> {
map.remove(key, value)
.thenAccept(result -> assertTrue(result)).join();
map.containsEntry(key, value)
.thenAccept(result -> assertFalse(result)).join();
removedKey[0] = key;
});
//Check that contains key works properly for removed keys
map.containsKey(removedKey[0])
.thenAccept(result -> assertFalse(result));
});
//Check that contains value works correctly for removed values
allValues.forEach(value -> {
map.containsValue(value)
.thenAccept(result -> assertFalse(result)).join();
});
map.destroy().join();
clearTests();
}
/**
* Contains tests for put, putAll, remove, removeAll and replace.
* @throws Exception
*/
@Ignore
@Test
public void addAndRemoveTest() throws Exception {
clearTests();
AsyncConsistentSetMultimap map = createResource(3);
//Test single put
allKeys.forEach(key -> {
//Value should actually be added here
allValues.forEach(value -> {
map.put(key, value)
.thenAccept(result -> assertTrue(result)).join();
//Duplicate values should be ignored here
map.put(key, value)
.thenAccept(result -> assertFalse(result)).join();
});
});
//Test single remove
allKeys.forEach(key -> {
//Value should actually be added here
allValues.forEach(value -> {
map.remove(key, value)
.thenAccept(result -> assertTrue(result)).join();
//Duplicate values should be ignored here
map.remove(key, value)
.thenAccept(result -> assertFalse(result)).join();
});
});
map.isEmpty().thenAccept(result -> assertTrue(result)).join();
//Test multi put
allKeys.forEach(key -> {
map.putAll(key, Lists.newArrayList(allValues.subList(0, 2)))
.thenAccept(result -> assertTrue(result)).join();
map.putAll(key, Lists.newArrayList(allValues.subList(0, 2)))
.thenAccept(result -> assertFalse(result)).join();
map.putAll(key, Lists.newArrayList(allValues.subList(2, 4)))
.thenAccept(result -> assertTrue(result)).join();
map.putAll(key, Lists.newArrayList(allValues.subList(2, 4)))
.thenAccept(result -> assertFalse(result)).join();
});
//Test multi remove
allKeys.forEach(key -> {
//Split the lists to test how multiRemove can work piecewise
map.removeAll(key, Lists.newArrayList(allValues.subList(0, 2)))
.thenAccept(result -> assertTrue(result)).join();
map.removeAll(key, Lists.newArrayList(allValues.subList(0, 2)))
.thenAccept(result -> assertFalse(result)).join();
map.removeAll(key, Lists.newArrayList(allValues.subList(2, 4)))
.thenAccept(result -> assertTrue(result)).join();
map.removeAll(key, Lists.newArrayList(allValues.subList(2, 4)))
.thenAccept(result -> assertFalse(result)).join();
});
map.isEmpty().thenAccept(result -> assertTrue(result)).join();
//Repopulate for next test
allKeys.forEach(key -> {
map.putAll(key, allValues)
.thenAccept(result -> assertTrue(result)).join();
});
map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
//Test removeAll of entire entry
allKeys.forEach(key -> {
map.removeAll(key).thenAccept(result -> {
assertTrue(
byteArrayCollectionIsEqual(allValues, result.value()));
}).join();
map.removeAll(key).thenAccept(result -> {
assertFalse(
byteArrayCollectionIsEqual(allValues, result.value()));
}).join();
});
map.isEmpty().thenAccept(result -> assertTrue(result)).join();
//Repopulate for next test
allKeys.forEach(key -> {
map.putAll(key, allValues)
.thenAccept(result -> assertTrue(result)).join();
});
map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
allKeys.forEach(key -> {
map.replaceValues(key, allValues)
.thenAccept(result ->
assertTrue(byteArrayCollectionIsEqual(allValues,
result.value())))
.join();
map.replaceValues(key, Lists.newArrayList())
.thenAccept(result ->
assertTrue(byteArrayCollectionIsEqual(allValues,
result.value())))
.join();
map.replaceValues(key, allValues)
.thenAccept(result ->
assertTrue(result.value().isEmpty()))
.join();
});
//Test replacements of partial sets
map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
allKeys.forEach(key -> {
map.remove(key, valueOne)
.thenAccept(result ->
assertTrue(result)).join();
map.replaceValues(key, Lists.newArrayList())
.thenAccept(result ->
assertTrue(byteArrayCollectionIsEqual(
Lists.newArrayList(valueTwo, valueThree,
valueFour),
result.value())))
.join();
map.replaceValues(key, allValues)
.thenAccept(result ->
assertTrue(result.value().isEmpty()))
.join();
});
map.destroy().join();
clearTests();
}
/**
* Tests the get, keySet, keys, values, and entries implementations as well
* as a trivial test of the asMap functionality (throws error)
* @throws Exception
*/
@Ignore
@Test
public void testAccessors() throws Exception {
clearTests();
AsyncConsistentSetMultimap map = createResource(3);
//Populate for full map behavior tests
allKeys.forEach(key -> {
map.putAll(key, allValues)
.thenAccept(result -> assertTrue(result)).join();
});
map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
allKeys.forEach(key -> {
map.get(key).thenAccept(result -> {
assertTrue(byteArrayCollectionIsEqual(allValues,
result.value()));
}).join();
});
//Test that the key set is correct
map.keySet()
.thenAccept(result ->
assertTrue(stringArrayCollectionIsEqual(allKeys,
result)))
.join();
//Test that the correct set and occurrence of values are found in the
//values result
map.values().thenAccept(result -> {
final Multiset<byte[]> set = TreeMultiset.create(
new ByteArrayComparator());
for (int i = 0; i < 4; i++) {
set.addAll(allValues);
}
assertEquals(16, result.size());
result.forEach(value -> assertTrue(set.remove(value)));
assertTrue(set.isEmpty());
}).join();
//Test that keys returns the right result including the correct number
//of each item
map.keys().thenAccept(result -> {
final Multiset<String> set = TreeMultiset.create();
for (int i = 0; i < 4; i++) {
set.addAll(allKeys);
}
assertEquals(16, result.size());
result.forEach(value -> assertTrue(set.remove(value)));
assertTrue(set.isEmpty());
}).join();
//Test that the right combination of key, value pairs are present
map.entries().thenAccept(result -> {
final Multiset<Map.Entry<String, byte[]>> set =
TreeMultiset.create(new EntryComparator());
allKeys.forEach(key -> {
allValues.forEach(value -> {
set.add(new DefaultMapEntry(key, value));
});
});
assertEquals(16, result.size());
result.forEach(entry -> assertTrue(set.remove(entry)));
assertTrue(set.isEmpty());
}).join();
//Testing for empty map behavior
map.clear().join();
allKeys.forEach(key -> {
map.get(key).thenAccept(result -> {
assertTrue(result.value().isEmpty());
}).join();
});
map.keySet().thenAccept(result -> assertTrue(result.isEmpty())).join();
map.values().thenAccept(result -> assertTrue(result.isEmpty())).join();
map.keys().thenAccept(result -> assertTrue(result.isEmpty())).join();
map.entries()
.thenAccept(result -> assertTrue(result.isEmpty())).join();
map.destroy();
clearTests();
}
private AsyncConsistentSetMultimap createResource(int clusterSize) {
try {
createCopycatServers(clusterSize);
AsyncConsistentSetMultimap map = createAtomixClient().
getResource("testMap", AsyncConsistentSetMultimap.class)
.join();
return map;
} catch (Throwable e) {
throw new RuntimeException(e.toString());
}
}
@Override
protected CopycatServer createCopycatServer(Address address) {
CopycatServer server = CopycatServer.builder(address, members)
.withTransport(new LocalTransport(registry))
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory(testDir + "/" + address.port())
.build())
.withStateMachine(ResourceManagerState::new)
.withSerializer(serializer.clone())
.withHeartbeatInterval(Duration.ofMillis(25))
.withElectionTimeout(Duration.ofMillis(50))
.withSessionTimeout(Duration.ofMillis(100))
.build();
copycatServers.add(server);
return server; }
/**
* Returns two arrays contain the same set of elements,
* regardless of order.
* @param o1 first collection
* @param o2 second collection
* @return true if they contain the same elements
*/
private boolean byteArrayCollectionIsEqual(
Collection<? extends byte[]> o1, Collection<? extends byte[]> o2) {
if (o1 == null || o2 == null || o1.size() != o2.size()) {
return false;
}
for (byte[] array1 : o1) {
boolean matched = false;
for (byte[] array2 : o2) {
if (Arrays.equals(array1, array2)) {
matched = true;
break;
}
}
if (!matched) {
return false;
}
}
return true;
}
/**
* Compares two collections of strings returns true if they contain the
* same strings, false otherwise.
* @param s1 string collection one
* @param s2 string collection two
* @return true if the two sets contain the same strings
*/
private boolean stringArrayCollectionIsEqual(
Collection<? extends String> s1, Collection<? extends String> s2) {
if (s1 == null || s2 == null || s1.size() != s2.size()) {
return false;
}
for (String string1 : s1) {
boolean matched = false;
for (String string2 : s2) {
if (string1.equals(string2)) {
matched = true;
break;
}
}
if (!matched) {
return false;
}
}
return true;
}
/**
* Byte array comparator implementation.
*/
private class ByteArrayComparator implements Comparator<byte[]> {
@Override
public int compare(byte[] o1, byte[] o2) {
if (Arrays.equals(o1, o2)) {
return 0;
} else {
for (int i = 0; i < o1.length && i < o2.length; i++) {
if (o1[i] < o2[i]) {
return -1;
} else if (o1[i] > o2[i]) {
return 1;
}
}
return o1.length > o2.length ? 1 : -1;
}
}
}
/**
* Entry comparator, uses both key and value to determine equality,
* for comparison falls back to the default string comparator.
*/
private class EntryComparator
implements Comparator<Map.Entry<String, byte[]>> {
@Override
public int compare(Map.Entry<String, byte[]> o1,
Map.Entry<String, byte[]> o2) {
if (o1 == null || o1.getKey() == null || o2 == null ||
o2.getKey() == null) {
throw new IllegalArgumentException();
}
if (o1.getKey().equals(o2.getKey()) &&
Arrays.equals(o1.getValue(), o2.getValue())) {
return 0;
} else {
return o1.getKey().compareTo(o2.getKey());
}
}
}
}