Madan Jampani
Committed by Gerrit Code Review

Revert "Migrating to latest Atomix"

This reverts commit aa3d598b

Change-Id: Icd58278b1bbc8ca31887450f58220fba40cd309c
Showing 31 changed files with 676 additions and 399 deletions
......@@ -39,6 +39,9 @@ import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
/**
* Implementation of {@code LeadershipStore} that makes use of a {@link LeaderElector}
* primitive.
......@@ -59,11 +62,13 @@ public class NewDistributedLeadershipStore
private NodeId localNodeId;
private LeaderElector leaderElector;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
Leadership oldValue = change.oldValue();
Leadership newValue = change.newValue();
leaderBoard.put(newValue.topic(), newValue);
boolean leaderChanged = !Objects.equals(oldValue.leader(), newValue.leader());
boolean candidatesChanged = !Objects.equals(oldValue.candidates(), newValue.candidates());
LeadershipEvent.Type eventType = null;
......@@ -87,6 +92,7 @@ public class NewDistributedLeadershipStore
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
leaderBoard.putAll(getLeaderships());
log.info("Started");
}
......@@ -123,11 +129,11 @@ public class NewDistributedLeadershipStore
@Override
public Leadership getLeadership(String topic) {
return leaderElector.getLeadership(topic);
return leaderBoard.get(topic);
}
@Override
public Map<String, Leadership> getLeaderships() {
return leaderElector.getLeaderships();
return ImmutableMap.copyOf(leaderBoard);
}
}
......
COMPILE_DEPS = [
'//lib:CORE_DEPS',
'//lib:catalyst-serializer',
'//lib:mapdb',
]
......
......@@ -3,12 +3,29 @@ COMPILE_DEPS = [
'//core/common:onos-core-common',
'//incubator/api:onos-incubator-api',
'//core/store/serializers:onos-core-serializers',
'//lib:copycat-client',
'//lib:copycat-server',
'//lib:copycat-protocol',
'//lib:copycat-core',
'//lib:typesafe-config',
'//lib:copycat-api',
'//lib:copycat-state-machine',
'//lib:copycat-state-log',
'//lib:catalyst-transport',
'//lib:catalyst-buffer',
'//lib:catalyst-common',
'//lib:catalyst-local',
'//lib:catalyst-serializer',
'//lib:atomix',
'//lib:atomix-resource',
'//lib:atomix-variables',
'//lib:atomix-resource-manager',
]
TEST_DEPS = [
'//lib:TEST',
'//core/api:onos-api-tests',
'//lib:onos-atomix',
]
osgi_jar_with_tests (
......
......@@ -70,9 +70,9 @@
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<groupId>org.onosproject</groupId>
<artifactId>atomix</artifactId>
<version>1.0.0-rc7</version>
<version>1.0.onos-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
......
......@@ -19,8 +19,9 @@ import java.util.Arrays;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.copycat.Query;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.variables.internal.LongCommands;
import io.atomix.variables.state.LongCommands;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
......@@ -62,7 +63,8 @@ public final class CatalystSerializers {
Transaction.State.class,
PrepareResult.class,
CommitResult.class,
RollbackResult.class));
RollbackResult.class,
Query.ConsistencyLevel.class));
// ONOS classes
serializer.register(Change.class, factory);
serializer.register(Leader.class, factory);
......
......@@ -29,7 +29,7 @@ import com.google.common.collect.Sets;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.util.concurrent.ThreadContext;
/**
* {@link Client} implementation for {@link CopycatTransport}.
......
......@@ -15,18 +15,6 @@
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.Listeners;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.serializer.SerializationException;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.reference.ReferenceCounted;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
......@@ -38,6 +26,8 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
......@@ -48,6 +38,18 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
import io.atomix.catalyst.serializer.SerializationException;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.catalyst.util.ReferenceCounted;
import io.atomix.catalyst.util.concurrent.ThreadContext;
/**
* {@link Connection} implementation for CopycatTransport.
*/
......
......@@ -17,12 +17,6 @@ package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.concurrent.CatalystThreadFactory;
import io.atomix.catalyst.concurrent.SingleThreadContext;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.Server;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
......@@ -42,6 +36,13 @@ import org.slf4j.Logger;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.catalyst.util.concurrent.SingleThreadContext;
import io.atomix.catalyst.util.concurrent.ThreadContext;
/**
* {@link Server} implementation for {@link CopycatTransport}.
*/
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.Command;
import io.atomix.copycat.Query;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.session.Session;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* {@code CopycatClient} that merely delegates control to
* another CopycatClient.
*/
public class DelegatingCopycatClient implements CopycatClient {
protected final CopycatClient client;
DelegatingCopycatClient(CopycatClient client) {
this.client = client;
}
@Override
public State state() {
return client.state();
}
@Override
public Listener<State> onStateChange(Consumer<State> callback) {
return client.onStateChange(callback);
}
@Override
public ThreadContext context() {
return client.context();
}
@Override
public Transport transport() {
return client.transport();
}
@Override
public Serializer serializer() {
return client.serializer();
}
@Override
public Session session() {
return client.session();
}
@Override
public <T> CompletableFuture<T> submit(Command<T> command) {
return client.submit(command);
}
@Override
public <T> CompletableFuture<T> submit(Query<T> query) {
return client.submit(query);
}
@Override
public Listener<Void> onEvent(String event, Runnable callback) {
return client.onEvent(event, callback);
}
@Override
public <T> Listener<T> onEvent(String event, Consumer<T> callback) {
return client.onEvent(event, callback);
}
@Override
public CompletableFuture<CopycatClient> connect(Collection<Address> members) {
return client.connect(members);
}
@Override
public CompletableFuture<CopycatClient> recover() {
return client.recover();
}
@Override
public CompletableFuture<Void> close() {
return client.close();
}
}
\ No newline at end of file
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.copycat.Query;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.error.QueryException;
import io.atomix.copycat.error.UnknownSessionException;
import io.atomix.copycat.session.ClosedSessionException;
/**
* {@code CopycatClient} that can retry when certain recoverable errors are encoutered.
*/
public class QueryRetryingCopycatClient extends DelegatingCopycatClient {
private final int maxRetries;
private final long delayBetweenRetriesMillis;
private final ScheduledExecutorService executor;
private final Logger log = getLogger(getClass());
private final Predicate<Throwable> retryableCheck = e -> e instanceof ConnectException
|| e instanceof TimeoutException
|| e instanceof TransportException
|| e instanceof ClosedChannelException
|| e instanceof QueryException
|| e instanceof UnknownSessionException
|| e instanceof ClosedSessionException;
QueryRetryingCopycatClient(CopycatClient client, int maxRetries, long delayBetweenRetriesMillis) {
super(client);
this.maxRetries = maxRetries;
this.delayBetweenRetriesMillis = delayBetweenRetriesMillis;
this.executor = Executors.newSingleThreadScheduledExecutor();
}
@Override
public CompletableFuture<Void> close() {
executor.shutdown();
return super.close();
}
@Override
public <T> CompletableFuture<T> submit(Query<T> query) {
CompletableFuture<T> future = new CompletableFuture<>();
executor.submit(() -> submit(query, 1, future));
return future;
}
private <T> void submit(Query<T> query, int attemptIndex, CompletableFuture<T> future) {
client.submit(query).whenComplete((r, e) -> {
if (e != null) {
if (attemptIndex < maxRetries + 1 && retryableCheck.test(Throwables.getRootCause(e))) {
log.debug("Retry attempt ({} of {}). Failure due to {}",
attemptIndex, maxRetries, Throwables.getRootCause(e).getClass());
executor.schedule(() ->
submit(query, attemptIndex + 1, future), delayBetweenRetriesMillis, TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(e);
}
} else {
future.complete(r);
}
});
}
}
......@@ -16,18 +16,22 @@
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Transport;
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.client.ConnectionStrategies;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.client.CopycatClient.State;
import io.atomix.copycat.client.RecoveryStrategies;
import io.atomix.copycat.client.RetryStrategies;
import io.atomix.copycat.client.ServerSelectionStrategies;
import io.atomix.manager.ResourceClient;
import io.atomix.manager.ResourceManagerException;
import io.atomix.manager.state.ResourceManagerException;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
import io.atomix.resource.util.ResourceRegistry;
import io.atomix.variables.DistributedLong;
import java.util.Collection;
......@@ -66,8 +70,8 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
private AtomixClient client;
private ResourceClient resourceClient;
private Atomix client;
private CopycatClient copycatClient;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
......@@ -95,15 +99,19 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public CompletableFuture<Void> open() {
if (client != null && client.isOpen()) {
return CompletableFuture.completedFuture(null);
}
synchronized (StoragePartitionClient.this) {
resourceClient = newResourceClient(transport,
copycatClient = newCopycatClient(partition.getMemberAddresses(),
transport,
serializer.clone(),
StoragePartition.RESOURCE_TYPES);
resourceClient.client().onStateChange(state -> log.debug("Partition {} client state"
copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+ " changed to {}", partition.getId(), state));
client = new AtomixClient(resourceClient);
client = new AtomixClient(new ResourceClient(copycatClient));
}
return client.connect(partition.getMemberAddresses()).whenComplete((r, e) -> {
return client.open().whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully started client for partition {}", partition.getId());
} else {
......@@ -124,7 +132,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
atomixConsistentMap.statusChangeListeners()
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
copycatClient.onStateChange(statusListener);
AsyncConsistentMap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@Override
......@@ -165,15 +173,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
.join();
Consumer<State> statusListener = state -> {
leaderElector.statusChangeListeners()
.forEach(listener -> listener.accept(mapper.apply(state)));
};
resourceClient.client().onStateChange(statusListener);
return leaderElector;
return client.getResource(name, AtomixLeaderElector.class).join();
}
@Override
......@@ -188,7 +188,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public boolean isOpen() {
return resourceClient.client().state() != State.CLOSED;
return client.isOpen();
}
/**
......@@ -198,33 +198,33 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
public PartitionClientInfo clientInfo() {
return new PartitionClientInfo(partition.getId(),
partition.getMembers(),
resourceClient.client().session().id(),
mapper.apply(resourceClient.client().state()));
copycatClient.session().id(),
mapper.apply(copycatClient.state()));
}
private ResourceClient newResourceClient(Transport transport,
private CopycatClient newCopycatClient(Collection<Address> members,
Transport transport,
io.atomix.catalyst.serializer.Serializer serializer,
Collection<ResourceType> resourceTypes) {
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
CopycatClient copycatClient = CopycatClient.builder()
CopycatClient client = CopycatClient.builder(members)
.withServerSelectionStrategy(ServerSelectionStrategies.ANY)
.withConnectionStrategy(ConnectionStrategies.FIBONACCI_BACKOFF)
.withRecoveryStrategy(RecoveryStrategies.RECOVER)
.withRetryStrategy(RetryStrategies.FIBONACCI_BACKOFF)
.withTransport(transport)
.withSerializer(serializer)
.withThreadFactory(new CatalystThreadFactory(String.format("copycat-client-%s", partition.getId())))
.build();
copycatClient.serializer().resolve(new ResourceManagerTypeResolver());
client.serializer().resolve(new ResourceManagerTypeResolver());
for (ResourceType type : registry.types()) {
try {
type.factory()
.newInstance()
.createSerializableTypeResolver()
.resolve(copycatClient.serializer().registry());
type.factory().newInstance().createSerializableTypeResolver().resolve(client.serializer().registry());
} catch (InstantiationException | IllegalAccessException e) {
throw new ResourceManagerException(e);
}
}
return new ResourceClient(new QueryRetryingCopycatClient(copycatClient, 2, 100));
return client;
}
}
......
......@@ -22,7 +22,7 @@ import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import java.io.File;
......@@ -68,9 +68,9 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
server = buildServer();
server = buildServer(partition.getMemberAddresses());
}
serverOpenFuture = server.bootstrap(partition.getMemberAddresses());
serverOpenFuture = server.start();
} else {
serverOpenFuture = CompletableFuture.completedFuture(null);
}
......@@ -85,7 +85,11 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
@Override
public CompletableFuture<Void> close() {
return server.shutdown();
/**
* CopycatServer#kill just shuts down the server and does not result
* in any cluster membership changes.
*/
return server.kill();
}
/**
......@@ -93,11 +97,11 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
* @return future that is completed when the operation is complete
*/
public CompletableFuture<Void> closeAndExit() {
return server.leave();
return server.stop();
}
private CopycatServer buildServer() {
CopycatServer server = CopycatServer.builder(localAddress)
private CopycatServer buildServer(Collection<Address> clusterMembers) {
CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
......@@ -114,8 +118,9 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
}
public CompletableFuture<Void> join(Collection<Address> otherMembers) {
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
server = buildServer(otherMembers);
return server.start().whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully joined partition {}", partition.getId());
} else {
......
......@@ -48,6 +48,10 @@ public final class AsyncConsistentMultimapCommands {
@SuppressWarnings("serial")
public abstract static class MultimapCommand<V> implements Command<V>,
CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.SEQUENTIAL;
}
@Override
public String toString() {
......
......@@ -70,90 +70,95 @@ public class AsyncConsistentSetMultimap
@Override
public CompletableFuture<Integer> size() {
return client.submit(new Size());
return submit(new Size());
}
@Override
public CompletableFuture<Boolean> isEmpty() {
return client.submit(new IsEmpty());
return submit(new IsEmpty());
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
return client.submit(new ContainsKey(key));
return submit(new ContainsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
return client.submit(new ContainsValue(value));
return submit(new ContainsValue(value));
}
@Override
public CompletableFuture<Boolean> containsEntry(String key, byte[] value) {
return client.submit(new ContainsEntry(key, value));
return submit(new ContainsEntry(key, value));
}
@Override
public CompletableFuture<Boolean> put(String key, byte[] value) {
return client.submit(new Put(key, Lists.newArrayList(value), null));
return submit(new Put(key, Lists.newArrayList(value), null));
}
@Override
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return client.submit(new MultiRemove(key,
Lists.newArrayList(value),
null));
return submit(new MultiRemove(key,
Lists.newArrayList(value),
null));
}
@Override
public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
return client.submit(new MultiRemove(key, (Collection<byte[]>) values, null));
public CompletableFuture<Boolean> removeAll(
String key, Collection<? extends byte[]> values) {
return submit(new MultiRemove(key, (Collection<byte[]>) values, null));
}
@Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAll(String key) {
return client.submit(new RemoveAll(key, null));
public CompletableFuture<
Versioned<Collection<? extends byte[]>>> removeAll(String key) {
return submit(new RemoveAll(key, null));
}
@Override
public CompletableFuture<Boolean> putAll(
String key, Collection<? extends byte[]> values) {
return client.submit(new Put(key, values, null));
return submit(new Put(key, values, null));
}
@Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
public CompletableFuture<
Versioned<Collection<? extends byte[]>>> replaceValues(
String key, Collection<byte[]> values) {
return client.submit(new Replace(key, values, null));
return submit(new Replace(key, values, null));
}
@Override
public CompletableFuture<Void> clear() {
return client.submit(new Clear());
return submit(new Clear());
}
@Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> get(String key) {
return client.submit(new Get(key));
public CompletableFuture<
Versioned<Collection<? extends byte[]>>> get(String key) {
return submit(new Get(key));
}
@Override
public CompletableFuture<Set<String>> keySet() {
return client.submit(new KeySet());
return submit(new KeySet());
}
@Override
public CompletableFuture<Multiset<String>> keys() {
return client.submit(new Keys());
return submit(new Keys());
}
@Override
public CompletableFuture<Multiset<byte[]>> values() {
return client.submit(new Values());
return submit(new Values());
}
@Override
public CompletableFuture<Collection<Map.Entry<String, byte[]>>> entries() {
return client.submit(new Entries());
return submit(new Entries());
}
@Override
......
......@@ -97,48 +97,48 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
public CompletableFuture<Boolean> isEmpty() {
return client.submit(new IsEmpty());
return submit(new IsEmpty());
}
@Override
public CompletableFuture<Integer> size() {
return client.submit(new Size());
return submit(new Size());
}
@Override
public CompletableFuture<Boolean> containsKey(String key) {
return client.submit(new ContainsKey(key));
return submit(new ContainsKey(key));
}
@Override
public CompletableFuture<Boolean> containsValue(byte[] value) {
return client.submit(new ContainsValue(value));
return submit(new ContainsValue(value));
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String key) {
return client.submit(new Get(key));
return submit(new Get(key));
}
@Override
public CompletableFuture<Set<String>> keySet() {
return client.submit(new KeySet());
return submit(new KeySet());
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values() {
return client.submit(new Values());
return submit(new Values());
}
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet() {
return client.submit(new EntrySet());
return submit(new EntrySet());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
......@@ -146,7 +146,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
return submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.newValue());
}
......@@ -154,14 +154,14 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
return submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> remove(String key) {
return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
return submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
......@@ -169,7 +169,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
return submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
......@@ -177,7 +177,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> remove(String key, long version) {
return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
return submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
......@@ -185,7 +185,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
return submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.oldValue());
}
......@@ -193,7 +193,10 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
return submit(new UpdateAndGet(key,
newValue,
Match.ifValue(oldValue),
Match.ANY))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
......@@ -201,14 +204,17 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
return submit(new UpdateAndGet(key,
newValue,
Match.ANY,
Match.ifValue(oldVersion)))
.whenComplete((r, e) -> throwIfLocked(r.status()))
.thenApply(v -> v.updated());
}
@Override
public CompletableFuture<Void> clear() {
return client.submit(new Clear())
return submit(new Clear())
.whenComplete((r, e) -> throwIfLocked(r))
.thenApply(v -> null);
}
......@@ -239,7 +245,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
}
Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
return client.submit(new UpdateAndGet(key,
return submit(new UpdateAndGet(key,
computedValue.get(),
valueMatch,
versionMatch))
......@@ -252,7 +258,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Executor executor) {
if (mapEventListeners.isEmpty()) {
return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else {
mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null);
......@@ -262,7 +268,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
return client.submit(new Unlisten()).thenApply(v -> null);
return submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
......@@ -275,23 +281,23 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
return submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return client.submit(new TransactionCommit(transactionId)).thenApply(v -> null);
return submit(new TransactionCommit(transactionId)).thenApply(v -> null);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return client.submit(new TransactionRollback(transactionId))
return submit(new TransactionRollback(transactionId))
.thenApply(v -> null);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
}
@Override
......
......@@ -51,6 +51,11 @@ public final class AtomixConsistentMapCommands {
public abstract static class MapCommand<V> implements Command<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.SEQUENTIAL;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
......
......@@ -42,9 +42,6 @@ import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorComman
import org.onosproject.store.service.AsyncLeaderElector;
import com.google.common.collect.ImmutableSet;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
/**
......@@ -57,34 +54,11 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
Sets.newCopyOnWriteArraySet();
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newCopyOnWriteArraySet();
private final Consumer<Change<Leadership>> cacheUpdater;
private final Consumer<Status> statusListener;
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private final LoadingCache<String, CompletableFuture<Leadership>> cache;
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
cache = CacheBuilder.newBuilder()
.maximumSize(1000)
.build(CacheLoader.from(topic -> this.client.submit(new GetLeadership(topic))));
cacheUpdater = change -> {
Leadership leadership = change.newValue();
cache.put(leadership.topic(), CompletableFuture.completedFuture(leadership));
};
statusListener = status -> {
if (status == Status.SUSPENDED || status == Status.INACTIVE) {
cache.invalidateAll();
}
};
addStatusChangeListener(statusListener);
}
@Override
public CompletableFuture<Void> destroy() {
removeStatusChangeListener(statusListener);
return removeChangeListener(cacheUpdater);
}
@Override
......@@ -100,57 +74,53 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
});
}
public CompletableFuture<AtomixLeaderElector> setupCache() {
return addChangeListener(cacheUpdater).thenApply(v -> this);
}
private void handleEvent(List<Change<Leadership>> changes) {
changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
public CompletableFuture<Leadership> run(String topic, NodeId nodeId) {
return client.submit(new Run(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
return submit(new Run(topic, nodeId));
}
@Override
public CompletableFuture<Void> withdraw(String topic) {
return client.submit(new Withdraw(topic)).whenComplete((r, e) -> cache.invalidate(topic));
return submit(new Withdraw(topic));
}
@Override
public CompletableFuture<Boolean> anoint(String topic, NodeId nodeId) {
return client.submit(new Anoint(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
return submit(new Anoint(topic, nodeId));
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return client.submit(new Promote(topic, nodeId)).whenComplete((r, e) -> cache.invalidate(topic));
return submit(new Promote(topic, nodeId));
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
return client.submit(new AtomixLeaderElectorCommands.Evict(nodeId));
return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return cache.getUnchecked(topic);
return submit(new GetLeadership(topic));
}
@Override
public CompletableFuture<Map<String, Leadership>> getLeaderships() {
return client.submit(new GetAllLeaderships());
return submit(new GetAllLeaderships());
}
public CompletableFuture<Set<String>> getElectedTopics(NodeId nodeId) {
return client.submit(new GetElectedTopics(nodeId));
return submit(new GetElectedTopics(nodeId));
}
@Override
public synchronized CompletableFuture<Void> addChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.isEmpty()) {
return client.submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
return submit(new Listen()).thenRun(() -> leadershipChangeListeners.add(consumer));
} else {
leadershipChangeListeners.add(consumer);
return CompletableFuture.completedFuture(null);
......@@ -160,7 +130,7 @@ public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
@Override
public synchronized CompletableFuture<Void> removeChangeListener(Consumer<Change<Leadership>> consumer) {
if (leadershipChangeListeners.remove(consumer) && leadershipChangeListeners.isEmpty()) {
return client.submit(new Unlisten()).thenApply(v -> null);
return submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
}
......
......@@ -50,6 +50,11 @@ public final class AtomixLeaderElectorCommands {
public abstract static class ElectionQuery<V> implements Query<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.BOUNDED_LINEARIZABLE;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
......@@ -98,6 +103,11 @@ public final class AtomixLeaderElectorCommands {
public abstract static class ElectionCommand<V> implements Command<V>, CatalystSerializable {
@Override
public ConsistencyLevel consistency() {
return ConsistencyLevel.LINEARIZABLE;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
......
......@@ -302,10 +302,8 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
* @return topic to leader mapping
*/
public Map<String, Leadership> allLeaderships(Commit<? extends GetAllLeaderships> commit) {
Map<String, Leadership> result = new HashMap<>();
try {
result.putAll(Maps.transformEntries(elections, (k, v) -> leadership(k)));
return result;
return Maps.transformEntries(elections, (k, v) -> leadership(k));
} finally {
commit.close();
}
......@@ -541,7 +539,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
byte[] encodedElections = serializer.encode(elections);
writer.writeInt(encodedElections.length);
writer.write(encodedElections);
log.debug("Took state machine snapshot");
log.info("Took state machine snapshot");
}
@Override
......@@ -554,7 +552,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
byte[] encodedElections = new byte[encodedElectionsSize];
reader.read(encodedElections);
elections = serializer.decode(encodedElections);
log.debug("Reinstated state machine from snapshot");
log.info("Reinstated state machine from snapshot");
}
private AtomicLong termCounter(String topic) {
......
......@@ -20,15 +20,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import com.google.common.io.Files;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.local.LocalTransport;
import io.atomix.catalyst.transport.LocalTransport;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceType;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
import org.junit.Ignore;
import org.junit.Test;
......@@ -429,7 +427,7 @@ public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
@Override
protected CopycatServer createCopycatServer(Address address) {
CopycatServer server = CopycatServer.builder(address)
CopycatServer server = CopycatServer.builder(address, members)
.withTransport(new LocalTransport(registry))
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
......@@ -442,8 +440,7 @@ public class AsyncConsistentSetMultimapTest extends AtomixTestBase {
.withSessionTimeout(Duration.ofMillis(100))
.build();
copycatServers.add(server);
return server;
}
return server; }
/**
* Returns two arrays contain the same set of elements,
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.resources.impl;
import io.atomix.resource.ResourceType;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
......@@ -27,6 +28,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
......@@ -42,6 +44,7 @@ import com.google.common.collect.Sets;
/**
* Unit tests for {@link AtomixConsistentMap}.
*/
@Ignore
public class AtomixConsistentMapTest extends AtomixTestBase {
@Override
......@@ -54,6 +57,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
*/
@Test
public void testBasicMapOperations() throws Throwable {
basicMapOperationTests(1);
clearTests();
basicMapOperationTests(2);
clearTests();
basicMapOperationTests(3);
}
......@@ -62,6 +69,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
*/
@Test
public void testMapComputeOperations() throws Throwable {
mapComputeOperationTests(1);
clearTests();
mapComputeOperationTests(2);
clearTests();
mapComputeOperationTests(3);
}
......@@ -70,6 +81,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
*/
@Test
public void testMapListeners() throws Throwable {
mapListenerTests(1);
clearTests();
mapListenerTests(2);
clearTests();
mapListenerTests(3);
}
......@@ -78,6 +93,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
*/
@Test
public void testTransactionCommit() throws Throwable {
transactionCommitTests(1);
clearTests();
transactionCommitTests(2);
clearTests();
transactionCommitTests(3);
}
......@@ -86,6 +105,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
*/
@Test
public void testTransactionRollback() throws Throwable {
transactionRollbackTests(1);
clearTests();
transactionRollbackTests(2);
clearTests();
transactionRollbackTests(3);
}
......
......@@ -20,6 +20,7 @@ import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
......@@ -29,12 +30,12 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.resource.ResourceType;
/**
* Unit tests for {@link AtomixLeaderElector}.
*/
@Ignore
public class AtomixLeaderElectorTest extends AtomixTestBase {
NodeId node1 = new NodeId("node1");
......@@ -48,7 +49,12 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testRun() throws Throwable {
leaderElectorRunTests(1);
clearTests();
leaderElectorRunTests(2);
clearTests();
leaderElectorRunTests(3);
clearTests();
}
private void leaderElectorRunTests(int numServers) throws Throwable {
......@@ -74,7 +80,12 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testWithdraw() throws Throwable {
leaderElectorWithdrawTests(1);
clearTests();
leaderElectorWithdrawTests(2);
clearTests();
leaderElectorWithdrawTests(3);
clearTests();
}
private void leaderElectorWithdrawTests(int numServers) throws Throwable {
......@@ -111,7 +122,12 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testAnoint() throws Throwable {
leaderElectorAnointTests(1);
clearTests();
leaderElectorAnointTests(2);
clearTests();
leaderElectorAnointTests(3);
clearTests();
}
private void leaderElectorAnointTests(int numServers) throws Throwable {
......@@ -142,6 +158,9 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
elector3.anoint("foo", node2).thenAccept(result -> {
assertTrue(result);
}).join();
assertTrue(listener1.hasEvent());
assertTrue(listener2.hasEvent());
assertTrue(listener3.hasEvent());
listener1.nextEvent().thenAccept(result -> {
assertEquals(node2, result.newValue().leaderNodeId());
......@@ -165,16 +184,21 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testPromote() throws Throwable {
leaderElectorPromoteTests(1);
clearTests();
leaderElectorPromoteTests(2);
clearTests();
leaderElectorPromoteTests(3);
clearTests();
}
private void leaderElectorPromoteTests(int numServers) throws Throwable {
createCopycatServers(numServers);
AtomixClient client1 = createAtomixClient();
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
AtomixClient client2 = createAtomixClient();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
AtomixClient client3 = createAtomixClient();
Atomix client3 = createAtomixClient();
AtomixLeaderElector elector3 = client3.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
......@@ -196,15 +220,9 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
elector3.run("foo", node3).join();
listener1.nextEvent().thenAccept(result -> {
assertEquals(node3, result.newValue().candidates().get(2));
}).join();
listener2.nextEvent().thenAccept(result -> {
assertEquals(node3, result.newValue().candidates().get(2));
}).join();
listener3.nextEvent().thenAccept(result -> {
assertEquals(node3, result.newValue().candidates().get(2));
}).join();
listener1.clearEvents();
listener2.clearEvents();
listener3.clearEvents();
elector3.promote("foo", node3).thenAccept(result -> {
assertTrue(result);
......@@ -223,12 +241,17 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testLeaderSessionClose() throws Throwable {
leaderElectorLeaderSessionCloseTests(1);
clearTests();
leaderElectorLeaderSessionCloseTests(2);
clearTests();
leaderElectorLeaderSessionCloseTests(3);
clearTests();
}
private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
createCopycatServers(numServers);
AtomixClient client1 = createAtomixClient();
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
......@@ -246,7 +269,12 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testNonLeaderSessionClose() throws Throwable {
leaderElectorNonLeaderSessionCloseTests(1);
clearTests();
leaderElectorNonLeaderSessionCloseTests(2);
clearTests();
leaderElectorNonLeaderSessionCloseTests(3);
clearTests();
}
private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
......@@ -254,7 +282,7 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
AtomixClient client2 = createAtomixClient();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
......@@ -269,7 +297,12 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
@Test
public void testQueries() throws Throwable {
leaderElectorQueryTests(1);
clearTests();
leaderElectorQueryTests(2);
clearTests();
leaderElectorQueryTests(3);
clearTests();
}
private void leaderElectorQueryTests(int numServers) throws Throwable {
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.primitives.resources.impl;
import static org.junit.Assert.*;
import org.junit.Ignore;
import org.junit.Test;
import io.atomix.Atomix;
......@@ -26,6 +27,7 @@ import io.atomix.variables.DistributedLong;
/**
* Unit tests for {@link AtomixCounter}.
*/
@Ignore
public class AtomixLongTest extends AtomixTestBase {
@Override
......@@ -35,7 +37,12 @@ public class AtomixLongTest extends AtomixTestBase {
@Test
public void testBasicOperations() throws Throwable {
basicOperationsTest(1);
clearTests();
basicOperationsTest(2);
clearTests();
basicOperationsTest(3);
clearTests();
}
protected void basicOperationsTest(int clusterSize) throws Throwable {
......
......@@ -15,16 +15,17 @@
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.local.LocalServerRegistry;
import io.atomix.catalyst.transport.local.LocalTransport;
import io.atomix.catalyst.transport.LocalServerRegistry;
import io.atomix.catalyst.transport.LocalTransport;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.internal.ResourceManagerState;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceType;
import java.io.File;
......@@ -52,7 +53,7 @@ public abstract class AtomixTestBase {
protected List<Address> members;
protected List<CopycatClient> copycatClients = new ArrayList<>();
protected List<CopycatServer> copycatServers = new ArrayList<>();
protected List<AtomixClient> atomixClients = new ArrayList<>();
protected List<Atomix> atomixClients = new ArrayList<>();
protected List<CopycatServer> atomixServers = new ArrayList<>();
protected Serializer serializer = CatalystSerializers.getSerializer();
......@@ -88,7 +89,7 @@ public abstract class AtomixTestBase {
for (int i = 0; i < nodes; i++) {
CopycatServer server = createCopycatServer(members.get(i));
server.bootstrap(members).thenRun(latch::countDown);
server.start().thenRun(latch::countDown);
servers.add(server);
}
......@@ -101,7 +102,7 @@ public abstract class AtomixTestBase {
* Creates a Copycat server.
*/
protected CopycatServer createCopycatServer(Address address) {
CopycatServer server = CopycatServer.builder(address)
CopycatServer server = CopycatServer.builder(address, members)
.withTransport(new LocalTransport(registry))
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
......@@ -126,11 +127,11 @@ public abstract class AtomixTestBase {
CompletableFuture<Void> closeClients =
CompletableFuture.allOf(atomixClients.stream()
.map(AtomixClient::close)
.map(Atomix::close)
.toArray(CompletableFuture[]::new));
closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
.map(CopycatServer::shutdown)
.map(CopycatServer::stop)
.toArray(CompletableFuture[]::new))).join();
deleteDirectory(TEST_DIR);
......@@ -162,13 +163,13 @@ public abstract class AtomixTestBase {
/**
* Creates a Atomix client.
*/
protected AtomixClient createAtomixClient() {
protected Atomix createAtomixClient() {
CountDownLatch latch = new CountDownLatch(1);
AtomixClient client = AtomixClient.builder()
Atomix client = AtomixClient.builder(members)
.withTransport(new LocalTransport(registry))
.withSerializer(serializer.clone())
.build();
client.connect(members).thenRun(latch::countDown);
client.open().thenRun(latch::countDown);
atomixClients.add(client);
Uninterruptibles.awaitUninterruptibly(latch);
return client;
......
......@@ -3,7 +3,6 @@ osgi_feature (
title="ONOS 3rd party dependencies",
required_features = [],
included_bundles = [
'//lib:atomix',
'//lib:commons-lang',
'//lib:commons-lang3',
'//lib:commons-configuration',
......@@ -34,6 +33,7 @@ osgi_feature (
'//lib:typesafe-config',
'//lib:concurrent-trees',
'//lib:commons-io',
'//lib:onos-atomix',
'//lib:jersey-client',
'//lib:mapdb',
]
......
......@@ -56,7 +56,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>wrap:mvn:com.googlecode.concurrent-trees/concurrent-trees/2.4.0$Bundle-SymbolicName=concurrent-trees&amp;Bundle-Version=2.4.0</bundle>
<bundle>mvn:commons-io/commons-io/2.4</bundle>
<bundle>mvn:io.atomix/atomix-all/1.0.0-rc7</bundle>
<bundle>mvn:org.onosproject/atomix/1.0.1.onos-SNAPSHOT</bundle>
<bundle>mvn:org.glassfish.jersey.core/jersey-client/2.22.2</bundle>
......
# ***** This file was auto-generated at Tue May 31 16:32:48 PDT 2016. Do not edit this file manually. *****
# ***** This file was auto-generated at Tue May 31 15:01:52 PDT 2016. Do not edit this file manually. *****
osgi_feature_group(
name = 'COMPILE',
visibility = ['PUBLIC'],
......@@ -133,10 +133,109 @@ remote_jar (
remote_jar (
name = 'atomix',
out = 'atomix-all-1.0.0-rc7.jar',
url = 'mvn:io.atomix:atomix-all:jar:1.0.0-rc7',
sha1 = 'ad103065adbf02971b6072719a02d6a93753125b',
maven_coords = 'io.atomix:atomix-all:1.0.0-rc7',
out = 'atomix-1.0.0-rc3.jar',
url = 'mvn:io.atomix:atomix:jar:1.0.0-rc3',
sha1 = 'a572aa9cd069b2d43481901dc901429d0b43332f',
maven_coords = 'io.atomix:atomix:1.0.0-rc3',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'atomix-collections',
out = 'atomix-collections-1.0.0-rc3.jar',
url = 'mvn:io.atomix:atomix-collections:jar:1.0.0-rc3',
sha1 = '161dbfd046cefabe7e6c972e70823c11f7abe65e',
maven_coords = 'io.atomix:atomix-collections:1.0.0-rc3',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'atomix-messaging',
out = 'atomix-messaging-1.0.0-rc3.jar',
url = 'mvn:io.atomix:atomix-messaging:jar:1.0.0-rc3',
sha1 = '58b570d8e3e76a0d0c649b97f3ee0a6e3885958a',
maven_coords = 'io.atomix:atomix-messaging:1.0.0-rc3',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'atomix-resource',
out = 'atomix-resource-1.0.0-rc3.jar',
url = 'mvn:io.atomix:atomix-resource:jar:1.0.0-rc3',
sha1 = 'e47a40d38e6241544ec75df0e6906c209190aebf',
maven_coords = 'io.atomix:atomix-resource:1.0.0-rc3',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'atomix-resource-manager',
out = 'atomix-resource-manager-1.0.0-rc3.jar',
url = 'mvn:io.atomix:atomix-resource-manager:jar:1.0.0-rc3',
sha1 = '41a4cf53c27df12efb04832e1314a81c09c857cb',
maven_coords = 'io.atomix:atomix-resource-manager:1.0.0-rc3',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'atomix-variables',
out = 'atomix-variables-1.0.0-rc3.jar',
url = 'mvn:io.atomix:atomix-variables:jar:1.0.0-rc3',
sha1 = 'dd0ca3c0d211b17b291877e21f0ef10f2aa4a9bd',
maven_coords = 'io.atomix:atomix-variables:1.0.0-rc3',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'onos-atomix',
out = 'atomix-1.0.0.onos.jar',
url = 'mvn:org.onosproject:atomix:jar:1.0.0.onos',
sha1 = '3d1a645b783a61b673aa71dbcc71a1bdd3afdaa2',
maven_coords = 'org.onosproject:atomix:1.0.0.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'catalyst-buffer',
out = 'catalyst-buffer-1.0.4.jar',
url = 'mvn:io.atomix.catalyst:catalyst-buffer:jar:1.0.4',
sha1 = '00fb023ebd860d44385750790328aa26a529c75f',
maven_coords = 'io.atomix.catalyst:catalyst-buffer:1.0.4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'catalyst-common',
out = 'catalyst-common-1.0.4.jar',
url = 'mvn:io.atomix.catalyst:catalyst-common:jar:1.0.4',
sha1 = '69d50a64ecf9f63de430aead9dc4b743d29f0195',
maven_coords = 'io.atomix.catalyst:catalyst-common:jar:NON-OSGI:1.0.4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'catalyst-local',
out = 'catalyst-local-1.0.4.jar',
url = 'mvn:io.atomix.catalyst:catalyst-local:jar:1.0.4',
sha1 = 'cbee759c63ce9127c979f4f399d327551644270f',
maven_coords = 'io.atomix.catalyst:catalyst-local:1.0.4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'catalyst-serializer',
out = 'catalyst-serializer-1.0.4.jar',
url = 'mvn:io.atomix.catalyst:catalyst-serializer:jar:1.0.4',
sha1 = 'e86352776cf4fa17eabf4e1d90fe0587ced4f788',
maven_coords = 'io.atomix.catalyst:catalyst-serializer:1.0.4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'catalyst-transport',
out = 'catalyst-transport-1.0.4.jar',
url = 'mvn:io.atomix.catalyst:catalyst-transport:jar:1.0.4',
sha1 = 'f36600add086a8848290cad9d6d117634b5bf069',
maven_coords = 'io.atomix.catalyst:catalyst-transport:jar:NON-OSGI:1.0.4',
visibility = [ 'PUBLIC' ],
)
......@@ -240,6 +339,87 @@ remote_jar (
)
remote_jar (
name = 'copycat-api',
out = 'copycat-api-0.5.1.onos.jar',
url = 'mvn:org.onosproject:copycat-api:jar:0.5.1.onos',
sha1 = 'b947348875485814e2a175a0435cdae4138452fc',
maven_coords = 'org.onosproject:copycat-api:jar:NON-OSGI:0.5.1.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-client',
out = 'copycat-client-1.0.0-rc4.jar',
url = 'mvn:io.atomix.copycat:copycat-client:jar:1.0.0-rc4',
sha1 = '9373c8920a57356b78896d791296a74a2eb868b4',
maven_coords = 'io.atomix.copycat:copycat-client:1.0.0-rc4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-core',
out = 'copycat-core-0.5.1.onos.jar',
url = 'mvn:org.onosproject:copycat-core:jar:0.5.1.onos',
sha1 = 'b268f3cbdd57f28244b21b2b8fc08116f63d736d',
maven_coords = 'org.onosproject:copycat-core:jar:NON-OSGI:0.5.1.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-event-log',
out = 'copycat-event-log-0.5.1.onos.jar',
url = 'mvn:org.onosproject:copycat-event-log:jar:0.5.1.onos',
sha1 = 'a9e32b13e6500c66113202e7d123e7184b726054',
maven_coords = 'org.onosproject:copycat-event-log:jar:NON-OSGI:0.5.1.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-leader-election',
out = 'copycat-leader-election-0.5.1.onos.jar',
url = 'mvn:org.onosproject:copycat-leader-election:jar:0.5.1.onos',
sha1 = 'a33617e98caf4e909d7ac744e1f6cdd1ba4b1698',
maven_coords = 'org.onosproject:copycat-leader-election:jar:NON-OSGI:0.5.1.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-protocol',
out = 'copycat-protocol-1.0.0-rc4.jar',
url = 'mvn:io.atomix.copycat:copycat-protocol:jar:1.0.0-rc4',
sha1 = 'cea774c2e4ce7021a6bfca64fd885e875f01f4dc',
maven_coords = 'io.atomix.copycat:copycat-protocol:1.0.0-rc4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-server',
out = 'copycat-server-1.0.0-rc4.jar',
url = 'mvn:io.atomix.copycat:copycat-server:jar:1.0.0-rc4',
sha1 = 'e2b6603dbd299d7b21685211df509dd4fbd2f0e9',
maven_coords = 'io.atomix.copycat:copycat-server:1.0.0-rc4',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-state-log',
out = 'copycat-state-log-0.5.1.onos.jar',
url = 'mvn:org.onosproject:copycat-state-log:jar:0.5.1.onos',
sha1 = '1dfa2b4c6da1cdc453fd3740cd506b9570f118ea',
maven_coords = 'org.onosproject:copycat-state-log:jar:NON-OSGI:0.5.1.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'copycat-state-machine',
out = 'copycat-state-machine-0.5.1.onos.jar',
url = 'mvn:org.onosproject:copycat-state-machine:jar:0.5.1.onos',
sha1 = '03f924b5c818c0684bdfa6c502e5fff8e07d6b77',
maven_coords = 'org.onosproject:copycat-state-machine:jar:NON-OSGI:0.5.1.onos',
visibility = [ 'PUBLIC' ],
)
remote_jar (
name = 'easymock',
out = 'easymock-3.4.jar',
url = 'mvn:org.easymock:easymock:jar:3.4',
......
......@@ -86,7 +86,19 @@
"gmetric4j": "mvn:info.ganglia.gmetric4j:gmetric4j:1.0.10",
"aopalliance-repackaged": "mvn:org.glassfish.hk2.external:aopalliance-repackaged:2.4.0-b34",
"asm": "mvn:org.ow2.asm:asm:5.0.3",
"atomix": "mvn:io.atomix:atomix-all:1.0.0-rc7",
"atomix": "mvn:io.atomix:atomix:1.0.0-rc3",
"atomix-collections": "mvn:io.atomix:atomix-collections:1.0.0-rc3",
"atomix-messaging": "mvn:io.atomix:atomix-messaging:1.0.0-rc3",
"atomix-resource": "mvn:io.atomix:atomix-resource:1.0.0-rc3",
"atomix-resource-manager": "mvn:io.atomix:atomix-resource-manager:1.0.0-rc3",
"atomix-variables": "mvn:io.atomix:atomix-variables:1.0.0-rc3",
"onos-atomix": "mvn:org.onosproject:atomix:1.0.0.onos",
"catalyst-buffer": "mvn:io.atomix.catalyst:catalyst-buffer:1.0.4",
"catalyst-common": "mvn:io.atomix.catalyst:catalyst-common:1.0.4",
"catalyst-local": "mvn:io.atomix.catalyst:catalyst-local:1.0.4",
"catalyst-serializer": "mvn:io.atomix.catalyst:catalyst-serializer:1.0.4",
"catalyst-transport": "mvn:io.atomix.catalyst:catalyst-transport:1.0.4",
"catalyst-transport": "mvn:io.atomix.catalyst:catalyst-transport:1.0.4",
"commons-codec": "mvn:commons-codec:commons-codec:1.10",
"commons-collections": "mvn:commons-collections:commons-collections:3.2.2",
"commons-configuration": "mvn:commons-configuration:commons-configuration:1.10",
......@@ -98,6 +110,15 @@
"commons-pool": "mvn:commons-pool:commons-pool:1.6",
"commons-beanutils": "mvn:commons-beanutils:commons-beanutils:1.9.2",
"concurrent-trees": "mvn:com.googlecode.concurrent-trees:concurrent-trees:2.4.0",
"copycat-api": "mvn:org.onosproject:copycat-api:0.5.1.onos",
"copycat-client": "mvn:io.atomix.copycat:copycat-client:1.0.0-rc4",
"copycat-core": "mvn:org.onosproject:copycat-core:0.5.1.onos",
"copycat-event-log": "mvn:org.onosproject:copycat-event-log:0.5.1.onos",
"copycat-leader-election": "mvn:org.onosproject:copycat-leader-election:0.5.1.onos",
"copycat-protocol": "mvn:io.atomix.copycat:copycat-protocol:1.0.0-rc4",
"copycat-server": "mvn:io.atomix.copycat:copycat-server:1.0.0-rc4",
"copycat-state-log": "mvn:org.onosproject:copycat-state-log:0.5.1.onos",
"copycat-state-machine": "mvn:org.onosproject:copycat-state-machine:0.5.1.onos",
"easymock": "mvn:org.easymock:easymock:3.4",
"antlr": "mvn:antlr:antlr:2.7.7",
"error_prone_annotations": "mvn:com.google.errorprone:error_prone_annotations:2.0.2",
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016-present Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-base</artifactId>
<version>1</version>
<relativePath/>
</parent>
<groupId>org.onosproject</groupId>
<artifactId>atomix</artifactId>
<packaging>bundle</packaging>
<version>1.0.1.onos-SNAPSHOT</version>
<description>Atomix shaded OSGi JAR</description>
<url>http://onosproject.org/</url>
<scm>
<connection>scm:git:https://gerrit.onosproject.org/onos</connection>
<developerConnection>scm:git:https://gerrit.onosproject.org/onos
</developerConnection>
<url>http://gerrit.onosproject.org/</url>
</scm>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<properties>
<!-- TODO: replace with final release version when it is out -->
<catalyst.version>1.0.6</catalyst.version>
<atomix.version>1.0.0-rc3</atomix.version>
<copycat.version>1.0.0-rc6</copycat.version>
</properties>
<dependencies>
<dependency>
<groupId>io.atomix.catalyst</groupId>
<artifactId>catalyst-transport</artifactId>
<version>${catalyst.version}</version>
</dependency>
<dependency>
<groupId>io.atomix.catalyst</groupId>
<artifactId>catalyst-serializer</artifactId>
<version>${catalyst.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
<version>${atomix.version}</version>
</dependency>
<dependency>
<groupId>io.atomix.copycat</groupId>
<artifactId>copycat-client</artifactId>
<version>${copycat.version}</version>
</dependency>
<dependency>
<groupId>io.atomix.copycat</groupId>
<artifactId>copycat-server</artifactId>
<version>${copycat.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<!-- TODO: update once following issue is fixed. -->
<!-- https://jira.codehaus.org/browse/MCOMPILER-205 -->
<version>2.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.2</version>
<configuration>
<createSourcesJar>true</createSourcesJar>
<artifactSet>
<excludes>
<!-- exclude OSGi-ready transitive dependencies -->
<exclude>com.google.guava:guava</exclude>
<exclude>com.esotericsoftware:*</exclude>
<exclude>org.ow2.asm:asm</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>io.netty:*</exclude>
<exclude>commons-io:commons-io</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>io.atomix:atomix-all</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>3.0.1</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Export-Package>
io.atomix.*
</Export-Package>
<Import-Package>
!sun.nio.ch,!sun.misc,*
</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
</project>
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.atomix;
/**
* Empty class required to get the atomix module to build properly.
*
* NOTE Required for shade plugin to operate.
*/
public class AtomixShaded {
}
......@@ -32,6 +32,7 @@
<description>Domain agnostic ON.Lab utilities</description>
<modules>
<module>atomix</module>
<module>junit</module>
<module>misc</module>
<module>yangutils</module>
......