Madan Jampani

Using latest atomix release candidate + Updates to CopycatTransport

Change-Id: I960af428ff733ee7467024811e3b3470e951ecb7
Showing 15 changed files with 99 additions and 69 deletions
......@@ -86,7 +86,7 @@ public class CopycatTransport implements Transport {
/**
* Maps {@link Address address} to {@link Endpoint endpoint}.
* @param address
* @param address address
* @return end point
*/
public static Endpoint toEndpoint(Address address) {
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -23,8 +24,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.math.RandomUtils;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
......@@ -36,26 +39,30 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
*/
public class CopycatTransportClient implements Client {
private final Logger log = getLogger(getClass());
private final PartitionId partitionId;
private final MessagingService messagingService;
private final CopycatTransport.Mode mode;
private final String newConnectionMessageSubject;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.mode = checkNotNull(mode);
this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
}
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
PartitionManager.HELLO_MESSAGE_SUBJECT,
"hello".getBytes())
.thenApplyAsync(r -> {
newConnectionMessageSubject,
Longs.toByteArray(nextConnectionId()))
.thenApplyAsync(bytes -> {
long connectionId = Longs.fromByteArray(bytes);
CopycatTransportConnection connection = new CopycatTransportConnection(
nextConnectionId(),
connectionId,
CopycatTransport.Mode.CLIENT,
partitionId,
remoteAddress,
......@@ -64,6 +71,7 @@ public class CopycatTransportClient implements Client {
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
connections.add(connection);
return connection;
}, context.executor());
......
......@@ -31,12 +31,14 @@ import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
......@@ -52,6 +54,7 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
*/
public class CopycatTransportConnection implements Connection {
private final Logger log = getLogger(getClass());
private final Listeners<Throwable> exceptionListeners = new Listeners<>();
private final Listeners<Connection> closeListeners = new Listeners<>();
......@@ -82,11 +85,11 @@ public class CopycatTransportConnection implements Connection {
this.remoteAddress = checkNotNull(address);
this.messagingService = checkNotNull(messagingService);
if (mode == CopycatTransport.Mode.CLIENT) {
this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
} else {
this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
}
this.context = checkNotNull(context);
}
......@@ -203,6 +206,7 @@ public class CopycatTransportConnection implements Connection {
@Override
public CompletableFuture<Void> close() {
log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
closeListeners.forEach(listener -> listener.accept(this));
if (mode == CopycatTransport.Mode.CLIENT) {
messagingService.unregisterHandler(inboundMessageSubject);
......
......@@ -35,6 +35,7 @@ import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
......@@ -54,13 +55,15 @@ public class CopycatTransportServer implements Server {
private final ScheduledExecutorService executorService;
private final PartitionId partitionId;
private final MessagingService messagingService;
private final String messageSubject;
private final String protocolMessageSubject;
private final String newConnectionMessageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.messageSubject = String.format("onos-copycat-%s", partitionId);
this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
}
......@@ -68,49 +71,49 @@ public class CopycatTransportServer implements Server {
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.compareAndSet(false, true)) {
// message handler for all non-connection-establishment messages.
messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
CopycatTransportConnection connection = connections.get(connectionId);
if (connection == null) {
throw new IOException("Closed connection");
}
byte[] messagePayload = IOUtils.toByteArray(input);
return connection.handle(messagePayload);
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
});
// message handler for new connection attempts.
ThreadContext context = ThreadContext.currentContextOrThrow();
listen(address, listener, context);
messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
long connectionId = Longs.fromByteArray(payload);
CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
CopycatTransport.toAddress(sender),
messagingService,
getOrCreateContext(context));
connections.put(connectionId, connection);
connection.closeListener(c -> connections.remove(connectionId, c));
log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
return CompletableFuture.supplyAsync(() -> {
listener.accept(connection);
// echo the connectionId back to indicate successful completion.
return payload;
}, context.executor());
});
context.execute(() -> listenFuture.complete(null));
}
return listenFuture;
}
private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
newConnectionCreated.set(true);
CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
CopycatTransport.toAddress(sender),
messagingService,
getOrCreateContext(context));
log.debug("Created new incoming connection {}", connectionId);
newConnection.closeListener(c -> connections.remove(connectionId, c));
return newConnection;
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
() -> {
if (newConnectionCreated.get()) {
listener.accept(connection);
}
return connection;
}, context.executor()).thenCompose(c -> c.handle(request));
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
});
context.execute(() -> {
listenFuture.complete(null);
});
}
@Override
public CompletableFuture<Void> close() {
messagingService.unregisterHandler(messageSubject);
messagingService.unregisterHandler(newConnectionMessageSubject);
messagingService.unregisterHandler(protocolMessageSubject);
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}
......
......@@ -54,7 +54,7 @@ public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFacto
}
@Override
public void write(T object, BufferOutput<?> buffer,
public void write(T object, BufferOutput buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
try {
byte[] payload = this.serializer.encode(object);
......@@ -66,7 +66,7 @@ public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFacto
}
@Override
public T read(Class<T> type, BufferInput<?> buffer,
public T read(Class<T> type, BufferInput buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
int size = buffer.readInt();
try {
......
......@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
......@@ -56,7 +57,6 @@ import com.google.common.collect.Maps;
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -72,8 +72,6 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
@Activate
public void activate() {
messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
(ep, input) -> CompletableFuture.completedFuture(input));
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
metadataService.getClusterMetadata()
......@@ -93,8 +91,8 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
log.info("Started");
}
@Deactivate
public void deactivate() {
messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
eventDispatcher.removeSink(PartitionEvent.class);
CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
......
......@@ -90,7 +90,7 @@ public class StorageManager implements StorageService, StorageAdminService {
private TransactionCoordinator transactionCoordinator;
@Activate
public void actiavte() {
public void activate() {
basePrimitiveCreator = partitionService.getDistributedPrimitiveCreator(PartitionId.from(0));
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
......
......@@ -19,14 +19,14 @@ import io.atomix.copycat.server.cluster.Member;
import java.util.Collection;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.service.PartitionInfo;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
/**
* Operational details for a {@code StoragePartition}.
......@@ -98,9 +98,11 @@ public class StoragePartitionDetails {
* @return partition info
*/
public PartitionInfo toPartitionInfo() {
Function<Member, String> memberToString =
m -> m == null ? "none" : String.format("%s:%d", m.address().host(), m.address().port());
return new PartitionInfo(partitionId.toString(),
leaderTerm,
Lists.transform(ImmutableList.copyOf(activeMembers), m -> m.address().toString()),
leader == null ? "none" : leader.address().toString());
activeMembers.stream().map(memberToString).collect(Collectors.toList()),
memberToString.apply(leader));
}
}
......
......@@ -22,6 +22,7 @@ import io.atomix.catalyst.transport.Transport;
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.StorageLevel;
import io.atomix.manager.ResourceManagerTypeResolver;
import io.atomix.manager.state.ResourceManagerState;
import io.atomix.resource.ResourceRegistry;
import io.atomix.resource.ResourceType;
......@@ -107,7 +108,7 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
return CopycatServer.builder(localAddress, partition.getMemberAddresses())
CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
......@@ -119,6 +120,8 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
.build())
.build();
server.serializer().resolve(new ResourceManagerTypeResolver(registry));
return server;
}
public Set<NodeId> configuredMembers() {
......
......@@ -43,8 +43,10 @@ import com.google.common.collect.Sets;
/**
* Distributed resource providing the {@link AsyncConsistentMap} primitive.
*/
@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
@ResourceTypeInfo(id = -151,
stateMachine = AtomixConsistentMapState.class,
typeResolver = AtomixConsistentMapCommands.TypeResolver.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
......
......@@ -26,6 +26,7 @@ import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.Collection;
import java.util.HashMap;
......@@ -69,12 +70,17 @@ import static com.google.common.base.Preconditions.checkState;
* State Machine for {@link AtomixConsistentMap} resource.
*/
public class AtomixConsistentMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
private final Map<Long, Commit<? extends AtomixConsistentMapCommands.Listen>> listeners = new HashMap<>();
private final Map<String, MapEntryValue> mapEntries = new HashMap<>();
private final Set<String> preparedKeys = Sets.newHashSet();
private final Map<TransactionId, Commit<? extends TransactionPrepare>> pendingTransactions = Maps.newHashMap();
private AtomicLong versionCounter = new AtomicLong(0);
public AtomixConsistentMapState() {
super(new ResourceType(AtomixConsistentMap.class));
}
@Override
public void snapshot(SnapshotWriter writer) {
writer.writeLong(versionCounter.get());
......
......@@ -25,7 +25,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.onlab.util.SharedExecutors;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
......@@ -36,8 +35,10 @@ import com.google.common.collect.Sets;
/**
* Distributed resource providing the {@link AsyncLeaderElector} primitive.
*/
@ResourceTypeInfo(id = -152, stateMachine = AtomixLeaderElectorState.class)
public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.Options>
@ResourceTypeInfo(id = -152,
stateMachine = AtomixLeaderElectorState.class,
typeResolver = AtomixLeaderElectorCommands.TypeResolver.class)
public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
implements AsyncLeaderElector {
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
......@@ -62,8 +63,7 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector, Resource.
}
private void handleEvent(Change<Leadership> change) {
SharedExecutors.getSingleThreadExecutor().execute(() ->
leadershipChangeListeners.forEach(l -> l.accept(change)));
leadershipChangeListeners.forEach(l -> l.accept(change));
}
@Override
......
......@@ -24,6 +24,7 @@ import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import io.atomix.resource.ResourceType;
import java.util.Arrays;
import java.util.HashMap;
......@@ -72,6 +73,10 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
ElectionState.class,
Registration.class);
public AtomixLeaderElectorState() {
super(new ResourceType(AtomixLeaderElector.class));
}
@Override
protected void configure(StateMachineExecutor executor) {
// Notification
......@@ -261,7 +266,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
private void onSessionEnd(Session session) {
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session);
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
if (listener != null) {
listener.close();
}
......
......@@ -110,7 +110,6 @@ public abstract class AtomixTestBase {
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.DISK)
.withDirectory(TEST_DIR + "/" + address.port())
.withSerializer(serializer.clone())
.build())
.withStateMachine(() -> new ResourceManagerState(resourceRegistry))
.withSerializer(serializer.clone())
......
......@@ -79,7 +79,7 @@
<onos-build-conf.version>1.2-SNAPSHOT</onos-build-conf.version>
<netty4.version>4.0.33.Final</netty4.version>
<!-- TODO: replace with final release version when it is out -->
<atomix.version>0.1.0-beta5</atomix.version>
<atomix.version>1.0.0-rc1</atomix.version>
<copycat.version>0.5.1.onos</copycat.version>
<openflowj.version>0.9.1.onos</openflowj.version>
<onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>
......