Madan Jampani
Committed by Gerrit Code Review

Moving to Atomix version 1.0.0-rc3

Change-Id: I75572a52b530741f482455d59922327121a03999
Showing 15 changed files with 170 additions and 118 deletions
......@@ -20,6 +20,8 @@ import java.util.Arrays;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.copycat.Query;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import io.atomix.variables.state.LongCommands;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
......@@ -28,6 +30,10 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
......@@ -79,6 +85,14 @@ public final class CatalystSerializers {
serializer.register(MapEvent.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.resolve(new LongCommands.TypeResolver());
serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
.registerClassLoader(AtomixLeaderElectorFactory.class);
return serializer;
}
}
......
......@@ -41,10 +41,4 @@ public interface Managed<T> {
* @return {@code true} if open
*/
boolean isOpen();
/**
* Return {@code true} if the managed object is closed.
* @return {@code true} if closed
*/
boolean isClosed();
}
......
......@@ -47,21 +47,20 @@ import com.google.common.collect.ImmutableSet;
public class StoragePartition implements Managed<StoragePartition> {
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final Serializer serializer;
private final MessagingService messagingService;
private final ClusterService clusterService;
private final File logFolder;
private Partition partition;
private static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
private NodeId localNodeId;
private StoragePartitionServer server;
private StoragePartitionClient client;
public static final Collection<ResourceType> RESOURCE_TYPES = ImmutableSet.of(
new ResourceType(DistributedLong.class),
new ResourceType(AtomixLeaderElector.class),
new ResourceType(AtomixConsistentMap.class));
public StoragePartition(Partition partition,
MessagingService messagingService,
ClusterService clusterService,
......@@ -96,8 +95,7 @@ public class StoragePartition implements Managed<StoragePartition> {
public CompletableFuture<Void> close() {
// We do not explicitly close the server and instead let the cluster
// deal with this as an unclean exit.
return closeClient().thenAccept(v -> isClosed.set(true))
.thenApply(v -> null);
return closeClient();
}
/**
......@@ -138,7 +136,6 @@ public class StoragePartition implements Managed<StoragePartition> {
() -> new CopycatTransport(CopycatTransport.Mode.SERVER,
partition.getId(),
messagingService),
RESOURCE_TYPES,
logFolder);
return server.open().thenRun(() -> this.server = server);
}
......@@ -158,7 +155,6 @@ public class StoragePartition implements Managed<StoragePartition> {
() -> new CopycatTransport(CopycatTransport.Mode.SERVER,
partition.getId(),
messagingService),
RESOURCE_TYPES,
logFolder);
return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
}
......@@ -168,8 +164,7 @@ public class StoragePartition implements Managed<StoragePartition> {
serializer,
new CopycatTransport(CopycatTransport.Mode.CLIENT,
partition.getId(),
messagingService),
RESOURCE_TYPES);
messagingService));
return client.open().thenApply(v -> client);
}
......@@ -183,12 +178,7 @@ public class StoragePartition implements Managed<StoragePartition> {
@Override
public boolean isOpen() {
return isOpened.get() && !isClosed.get();
}
@Override
public boolean isClosed() {
return isClosed.get();
return isOpened.get();
}
private CompletableFuture<Void> closeClient() {
......@@ -209,7 +199,7 @@ public class StoragePartition implements Managed<StoragePartition> {
* @return partition info
*/
public Optional<PartitionInfo> info() {
return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
return server != null && server.isOpen() ? Optional.of(server.info()) : Optional.empty();
}
public void onUpdate(Partition newValue) {
......
......@@ -19,10 +19,8 @@ import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Transport;
import io.atomix.resource.ResourceType;
import io.atomix.variables.DistributedLong;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -43,7 +41,6 @@ import org.slf4j.Logger;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
/**
* StoragePartition client.
......@@ -55,7 +52,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
private final Collection<ResourceType> resourceTypes;
private Atomix client;
private static final String ATOMIC_VALUES_CONSISTENT_MAP_NAME = "onos-atomic-values";
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
......@@ -64,12 +60,10 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
public StoragePartitionClient(StoragePartition partition,
io.atomix.catalyst.serializer.Serializer serializer,
Transport transport,
Collection<ResourceType> resourceTypes) {
Transport transport) {
this.partition = partition;
this.serializer = serializer;
this.transport = transport;
this.resourceTypes = ImmutableSet.copyOf(resourceTypes);
}
@Override
......@@ -79,10 +73,8 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
}
synchronized (StoragePartitionClient.this) {
client = AtomixClient.builder(partition.getMemberAddresses())
.withResourceTypes(StoragePartition.RESOURCE_TYPES)
.withSerializer(serializer.clone())
.withResourceResolver(r -> {
resourceTypes.forEach(r::register);
})
.withTransport(transport)
.build();
}
......@@ -103,7 +95,8 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
AsyncConsistentMap<String, byte[]> rawMap =
new DelegatingAsyncConsistentMap<String, byte[]>(client.get(name, AtomixConsistentMap.class).join()) {
new DelegatingAsyncConsistentMap<String, byte[]>(client.getResource(name, AtomixConsistentMap.class)
.join()) {
@Override
public String name() {
return name;
......@@ -125,7 +118,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public AsyncAtomicCounter newAsyncCounter(String name) {
DistributedLong distributedLong = client.get(name, DistributedLong.class).join();
DistributedLong distributedLong = client.getLong(name).join();
return new AtomixCounter(name, distributedLong);
}
......@@ -144,7 +137,7 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
return client.get(name, AtomixLeaderElector.class).join();
return client.getResource(name, AtomixLeaderElector.class).join();
}
@Override
......@@ -161,9 +154,4 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
public boolean isOpen() {
return client.isOpen();
}
@Override
public boolean isClosed() {
return client.isClosed();
}
}
......
......@@ -22,12 +22,8 @@ import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.ResourceManagerTypeResolver;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
import io.atomix.resource.ResourceTypeResolver;
import io.atomix.resource.ServiceLoaderResourceResolver;
import io.atomix.manager.util.ResourceManagerTypeResolver;
import java.io.File;
import java.util.Collection;
......@@ -37,8 +33,6 @@ import java.util.function.Supplier;
import org.onosproject.store.service.PartitionInfo;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
/**
* {@link StoragePartition} server.
*/
......@@ -52,20 +46,17 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
private final Supplier<Transport> transport;
private final Serializer serializer;
private final File dataFolder;
private final Collection<ResourceType> resourceTypes;
private CopycatServer server;
public StoragePartitionServer(Address localAddress,
StoragePartition partition,
Serializer serializer,
Supplier<Transport> transport,
Collection<ResourceType> resourceTypes,
File dataFolder) {
this.partition = partition;
this.localAddress = localAddress;
this.serializer = serializer;
this.transport = transport;
this.resourceTypes = ImmutableSet.copyOf(resourceTypes);
this.dataFolder = dataFolder;
}
......@@ -73,13 +64,13 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
public CompletableFuture<Void> open() {
CompletableFuture<CopycatServer> serverOpenFuture;
if (partition.getMemberAddresses().contains(localAddress)) {
if (server != null && server.isOpen()) {
if (server != null && server.isRunning()) {
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
server = buildServer(partition.getMemberAddresses());
}
serverOpenFuture = server.open();
serverOpenFuture = server.start();
} else {
serverOpenFuture = CompletableFuture.completedFuture(null);
}
......@@ -106,19 +97,15 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
* @return future that is completed when the operation is complete
*/
public CompletableFuture<Void> closeAndExit() {
return server.close();
return server.stop();
}
private CopycatServer buildServer(Collection<Address> clusterMembers) {
ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
.withStateMachine(() -> new ResourceManagerState(registry))
.withStateMachine(ResourceManagerState::new)
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withCompactionThreads(1)
......@@ -126,14 +113,14 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
.build())
.build();
server.serializer().resolve(new ResourceManagerTypeResolver(registry));
server.serializer().resolve(new ResourceManagerTypeResolver());
return server;
}
public CompletableFuture<Void> join(Collection<Address> otherMembers) {
server = buildServer(otherMembers);
return server.open().whenComplete((r, e) -> {
return server.start().whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully joined partition {}", partition.getId());
} else {
......@@ -144,12 +131,7 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
@Override
public boolean isOpen() {
return server.isOpen();
}
@Override
public boolean isClosed() {
return server.isClosed();
return server.isRunning();
}
/**
......
......@@ -16,13 +16,14 @@
package org.onosproject.store.primitives.resources.impl;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Resource;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
......@@ -57,18 +58,16 @@ import com.google.common.collect.Sets;
/**
* Distributed resource providing the {@link AsyncConsistentMap} primitive.
*/
@ResourceTypeInfo(id = -151,
stateMachine = AtomixConsistentMapState.class,
typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
@ResourceTypeInfo(id = -151, factory = AtomixConsistentMapFactory.class)
public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
public static final String CHANGE_SUBJECT = "changeEvents";
public AtomixConsistentMap(CopycatClient client, Resource.Options options) {
super(client, options);
public AtomixConsistentMap(CopycatClient client, Properties properties) {
super(client, properties);
}
@Override
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory;
import io.atomix.resource.ResourceStateMachine;
import java.util.Properties;
/**
* {@link AtomixConsistentMap} resource factory.
*
*/
public class AtomixConsistentMapFactory implements ResourceFactory<AtomixConsistentMap> {
@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new AtomixConsistentMapCommands.TypeResolver();
}
@Override
public ResourceStateMachine createStateMachine(Properties config) {
return new AtomixConsistentMapState(config);
}
@Override
public AtomixConsistentMap createInstance(CopycatClient client, Properties options) {
return new AtomixConsistentMap(client, options);
}
}
\ No newline at end of file
......@@ -26,13 +26,13 @@ import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
......@@ -77,8 +77,8 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
public AtomixConsistentMapState() {
super(new ResourceType(AtomixConsistentMap.class));
public AtomixConsistentMapState(Properties properties) {
super(properties);
}
@Override
......
......@@ -17,11 +17,12 @@ package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.util.Listener;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Resource;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
......@@ -45,10 +46,8 @@ import com.google.common.collect.Sets;
/**
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
@ResourceTypeInfo(id = -152,
stateMachine = AtomixLeaderElectorState.class,
typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
@ResourceTypeInfo(id = -151, factory = AtomixLeaderElectorFactory.class)
public class AtomixLeaderElector extends AbstractResource<AtomixLeaderElector>
implements AsyncLeaderElector {
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
......@@ -56,8 +55,8 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private Listener<Change<Leadership>> listener;
public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
super(client, options);
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
}
@Override
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory;
import io.atomix.resource.ResourceStateMachine;
import java.util.Properties;
/**
* {@link AtomixLeaderElector} resource factory.
*
*/
public class AtomixLeaderElectorFactory implements ResourceFactory<AtomixLeaderElector> {
@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new AtomixLeaderElectorCommands.TypeResolver();
}
@Override
public ResourceStateMachine createStateMachine(Properties config) {
return new AtomixLeaderElectorState(config);
}
@Override
public AtomixLeaderElector createInstance(CopycatClient client, Properties options) {
return new AtomixLeaderElector(client, options);
}
}
\ No newline at end of file
......@@ -24,7 +24,6 @@ import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.Arrays;
import java.util.HashMap;
......@@ -33,6 +32,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
......@@ -75,8 +75,8 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
ElectionState.class,
Registration.class);
public AtomixLeaderElectorState() {
super(new ResourceType(AtomixLeaderElector.class));
public AtomixLeaderElectorState(Properties properties) {
super(properties);
}
@Override
......
......@@ -116,7 +116,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
final byte[] rawFooValue = Tools.getBytesUtf8("Hello foo!");
final byte[] rawBarValue = Tools.getBytesUtf8("Hello bar!");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
AtomixConsistentMap map = createAtomixClient().getResource("test", AtomixConsistentMap.class).join();
map.isEmpty().thenAccept(result -> {
assertTrue(result);
......@@ -246,7 +246,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
final byte[] value2 = Tools.getBytesUtf8("value2");
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
AtomixConsistentMap map = createAtomixClient().getResource("test", AtomixConsistentMap.class).join();
map.computeIfAbsent("foo", k -> value1).thenAccept(result -> {
assertTrue(Arrays.equals(Versioned.valueOrElse(result, null), value1));
......@@ -284,7 +284,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
final byte[] value2 = Tools.getBytesUtf8("value2");
final byte[] value3 = Tools.getBytesUtf8("value3");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
AtomixConsistentMap map = createAtomixClient().getResource("test", AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
// add listener; insert new value into map and verify an INSERT event is received.
......@@ -343,7 +343,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
AtomixConsistentMap map = createAtomixClient().getResource("test", AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
......@@ -398,7 +398,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
final byte[] value1 = Tools.getBytesUtf8("value1");
final byte[] value2 = Tools.getBytesUtf8("value2");
AtomixConsistentMap map = createAtomixClient().get("test", AtomixConsistentMap.class).join();
AtomixConsistentMap map = createAtomixClient().getResource("test", AtomixConsistentMap.class).join();
TestMapEventListener listener = new TestMapEventListener();
map.addListener(listener).join();
......
......@@ -60,7 +60,7 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
private void leaderElectorRunTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
......@@ -68,7 +68,7 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
assertEquals(node1, result.candidates().get(0));
}).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
elector2.run("foo", node2).thenAccept(result -> {
assertEquals(node1, result.leaderNodeId());
assertEquals(1, result.leader().term());
......@@ -91,10 +91,10 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
private void leaderElectorWithdrawTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
elector2.run("foo", node2).join();
LeaderEventListener listener1 = new LeaderEventListener();
......@@ -133,11 +133,11 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
private void leaderElectorAnointTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
Atomix client3 = createAtomixClient();
AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector3 = client3.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
......@@ -195,11 +195,11 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
private void leaderElectorPromoteTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
Atomix client3 = createAtomixClient();
AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector3 = client3.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
......@@ -252,10 +252,10 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
private void leaderElectorLeaderSessionCloseTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector2.addChangeListener(listener).join();
......@@ -280,10 +280,10 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
private void leaderElectorNonLeaderSessionCloseTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
LeaderEventListener listener = new LeaderEventListener();
elector2.run("foo", node2).join();
elector1.addChangeListener(listener).join();
......@@ -309,8 +309,8 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector1 = client1.getResource("test-elector", AtomixLeaderElector.class).join();
AtomixLeaderElector elector2 = client2.getResource("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
elector2.run("bar", node2).join();
......
......@@ -26,7 +26,6 @@ import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
import java.io.File;
......@@ -90,7 +89,7 @@ public abstract class AtomixTestBase {
for (int i = 0; i < nodes; i++) {
CopycatServer server = createCopycatServer(members.get(i));
server.open().thenRun(latch::countDown);
server.start().thenRun(latch::countDown);
servers.add(server);
}
......@@ -103,15 +102,13 @@ public abstract class AtomixTestBase {
* Creates a Copycat server.
*/
protected CopycatServer createCopycatServer(Address address) {
ResourceRegistry resourceRegistry = new ResourceRegistry();
resourceRegistry.register(resourceType());
CopycatServer server = CopycatServer.builder(address, members)
.withTransport(new LocalTransport(registry))
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(TEST_DIR + "/" + address.port())
.build())
.withStateMachine(() -> new ResourceManagerState(resourceRegistry))
.withStateMachine(ResourceManagerState::new)
.withSerializer(serializer.clone())
.withHeartbeatInterval(Duration.ofMillis(25))
.withElectionTimeout(Duration.ofMillis(50))
......@@ -134,7 +131,7 @@ public abstract class AtomixTestBase {
.toArray(CompletableFuture[]::new));
closeClients.thenCompose(v -> CompletableFuture.allOf(copycatServers.stream()
.map(CopycatServer::close)
.map(CopycatServer::stop)
.toArray(CompletableFuture[]::new))).join();
deleteDirectory(TEST_DIR);
......@@ -171,7 +168,6 @@ public abstract class AtomixTestBase {
Atomix client = AtomixClient.builder(members)
.withTransport(new LocalTransport(registry))
.withSerializer(serializer.clone())
.withResourceResolver(r -> r.register(resourceType()))
.build();
client.open().thenRun(latch::countDown);
atomixClients.add(client);
......
......@@ -77,7 +77,7 @@
<onos-build-conf.version>1.2</onos-build-conf.version>
<netty4.version>4.0.33.Final</netty4.version>
<!-- TODO: replace with final release version when it is out -->
<atomix.version>1.0.0-rc2</atomix.version>
<atomix.version>1.0.0-rc3</atomix.version>
<copycat.version>0.5.1.onos</copycat.version>
<openflowj.version>0.9.2.onos</openflowj.version>
<onos-maven-plugin.version>1.9</onos-maven-plugin.version>
......