Madan Jampani
Committed by Gerrit Code Review

Using 1.0.0.rc2 version of Atomix

CopycatTransport updates

Change-Id: If384ac2574f098c327f0e5749766268c8d7f1ecd
Showing 15 changed files with 96 additions and 103 deletions
......@@ -120,6 +120,6 @@ public interface AsyncLeaderElector extends DistributedPrimitive {
* @return new {@code LeaderElector} instance
*/
default LeaderElector asLeaderElector() {
return asLeaderElector(DEFAULT_OPERTATION_TIMEOUT_MILLIS);
return asLeaderElector(Long.MAX_VALUE);
}
}
......
......@@ -19,7 +19,7 @@ import java.util.Arrays;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.copycat.client.Query;
import io.atomix.copycat.Query;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
......
......@@ -16,7 +16,6 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -24,10 +23,8 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.math.RandomUtils;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
......@@ -39,42 +36,32 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
*/
public class CopycatTransportClient implements Client {
private final Logger log = getLogger(getClass());
private final PartitionId partitionId;
private final MessagingService messagingService;
private final CopycatTransport.Mode mode;
private final String newConnectionMessageSubject;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
CopycatTransportClient(PartitionId partitionId, MessagingService messagingService, CopycatTransport.Mode mode) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.mode = checkNotNull(mode);
this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
}
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
newConnectionMessageSubject,
Longs.toByteArray(nextConnectionId()))
.thenApplyAsync(bytes -> {
long connectionId = Longs.fromByteArray(bytes);
CopycatTransportConnection connection = new CopycatTransportConnection(
connectionId,
CopycatTransport.Mode.CLIENT,
partitionId,
remoteAddress,
messagingService,
context);
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
log.debug("Created new outgoing connection[id={}] to {}", connectionId, remoteAddress);
connections.add(connection);
return connection;
}, context.executor());
CopycatTransportConnection connection = new CopycatTransportConnection(
nextConnectionId(),
CopycatTransport.Mode.CLIENT,
partitionId,
remoteAddress,
messagingService,
context);
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
connections.add(connection);
return CompletableFuture.supplyAsync(() -> connection, context.executor());
}
@Override
......
......@@ -31,14 +31,12 @@ import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
......@@ -54,7 +52,6 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
*/
public class CopycatTransportConnection implements Connection {
private final Logger log = getLogger(getClass());
private final Listeners<Throwable> exceptionListeners = new Listeners<>();
private final Listeners<Connection> closeListeners = new Listeners<>();
......@@ -85,11 +82,11 @@ public class CopycatTransportConnection implements Connection {
this.remoteAddress = checkNotNull(address);
this.messagingService = checkNotNull(messagingService);
if (mode == CopycatTransport.Mode.CLIENT) {
this.outboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
this.inboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
this.outboundMessageSubject = String.format("onos-copycat-%s", partitionId);
this.inboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
} else {
this.outboundMessageSubject = String.format("onos-copycat-client-%s-%d", partitionId, connectionId);
this.inboundMessageSubject = String.format("onos-copycat-server-%s", partitionId);
this.outboundMessageSubject = String.format("onos-copycat-%s-%d", partitionId, connectionId);
this.inboundMessageSubject = String.format("onos-copycat-%s", partitionId);
}
this.context = checkNotNull(context);
}
......@@ -206,7 +203,6 @@ public class CopycatTransportConnection implements Connection {
@Override
public CompletableFuture<Void> close() {
log.debug("Closing connection[id={}, mode={}] to {}", connectionId, mode, remoteAddress);
closeListeners.forEach(listener -> listener.accept(this));
if (mode == CopycatTransport.Mode.CLIENT) {
messagingService.unregisterHandler(inboundMessageSubject);
......
......@@ -35,7 +35,6 @@ import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
......@@ -55,15 +54,13 @@ public class CopycatTransportServer implements Server {
private final ScheduledExecutorService executorService;
private final PartitionId partitionId;
private final MessagingService messagingService;
private final String protocolMessageSubject;
private final String newConnectionMessageSubject;
private final String messageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(PartitionId partitionId, MessagingService messagingService) {
this.partitionId = checkNotNull(partitionId);
this.messagingService = checkNotNull(messagingService);
this.protocolMessageSubject = String.format("onos-copycat-server-%s", partitionId);
this.newConnectionMessageSubject = String.format("onos-copycat-server-connection-%s", partitionId);
this.messageSubject = String.format("onos-copycat-%s", partitionId);
this.executorService = Executors.newScheduledThreadPool(Math.min(4, Runtime.getRuntime().availableProcessors()),
new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
}
......@@ -71,49 +68,49 @@ public class CopycatTransportServer implements Server {
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.compareAndSet(false, true)) {
// message handler for all non-connection-establishment messages.
messagingService.registerHandler(protocolMessageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
CopycatTransportConnection connection = connections.get(connectionId);
if (connection == null) {
throw new IOException("Closed connection");
}
byte[] messagePayload = IOUtils.toByteArray(input);
return connection.handle(messagePayload);
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
});
// message handler for new connection attempts.
ThreadContext context = ThreadContext.currentContextOrThrow();
messagingService.registerHandler(newConnectionMessageSubject, (sender, payload) -> {
long connectionId = Longs.fromByteArray(payload);
CopycatTransportConnection connection = new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
CopycatTransport.toAddress(sender),
messagingService,
getOrCreateContext(context));
connections.put(connectionId, connection);
connection.closeListener(c -> connections.remove(connectionId, c));
log.debug("Created new incoming connection[id={}] from {}", connectionId, sender);
return CompletableFuture.supplyAsync(() -> {
listener.accept(connection);
// echo the connectionId back to indicate successful completion.
return payload;
}, context.executor());
});
context.execute(() -> listenFuture.complete(null));
listen(address, listener, context);
}
return listenFuture;
}
private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
newConnectionCreated.set(true);
CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
CopycatTransport.toAddress(sender),
messagingService,
getOrCreateContext(context));
log.debug("Created new incoming connection {}", connectionId);
newConnection.closeListener(c -> connections.remove(connectionId, c));
return newConnection;
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
() -> {
if (newConnectionCreated.get()) {
listener.accept(connection);
}
return connection;
}, context.executor()).thenCompose(c -> c.handle(request));
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
});
context.execute(() -> {
listenFuture.complete(null);
});
}
@Override
public CompletableFuture<Void> close() {
messagingService.unregisterHandler(newConnectionMessageSubject);
messagingService.unregisterHandler(protocolMessageSubject);
messagingService.unregisterHandler(messageSubject);
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}
......
......@@ -105,14 +105,16 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
@Override
public CompletableFuture<Void> leave(PartitionId partitionId) {
// TODO: Implement
return Tools.exceptionalFuture(new UnsupportedOperationException());
return partitions.get(partitionId)
.server()
.map(server -> server.close())
.orElse(CompletableFuture.completedFuture(null));
}
@Override
public CompletableFuture<Void> join(PartitionId partitionId) {
// TODO: Implement
return Tools.exceptionalFuture(new UnsupportedOperationException());
return partitions.get(partitionId)
.open();
}
@Override
......
......@@ -72,10 +72,22 @@ public class StoragePartition extends DefaultPartition implements Managed<Storag
this.logFolder = logFolder;
}
/**
* Returns the partition client instance.
* @return client
*/
public StoragePartitionClient client() {
return client;
}
/**
* Returns the optional server instance.
* @return server
*/
public Optional<StoragePartitionServer> server() {
return server;
}
@Override
public CompletableFuture<Void> open() {
return openServer().thenAccept(s -> server = Optional.ofNullable(s))
......
......@@ -114,8 +114,7 @@ public class StoragePartitionServer implements Managed<StoragePartitionServer> {
.withTransport(transport.get())
.withStateMachine(() -> new ResourceManagerState(registry))
.withStorage(Storage.builder()
// FIXME: StorageLevel should be DISK
.withStorageLevel(StorageLevel.MEMORY)
.withStorageLevel(StorageLevel.DISK)
.withCompactionThreads(1)
.withDirectory(dataFolder)
.withMaxEntriesPerSegment(MAX_ENTRIES_PER_LOG_SEGMENT)
......
......@@ -65,7 +65,7 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap>
@Override
public CompletableFuture<AtomixConsistentMap> open() {
return super.open().thenApply(result -> {
client.session().onEvent(CHANGE_SUBJECT, this::handleEvent);
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
......
......@@ -22,8 +22,8 @@ import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
import io.atomix.copycat.Command;
import io.atomix.copycat.Query;
import java.util.Collection;
import java.util.Map;
......
......@@ -18,7 +18,7 @@ package org.onosproject.store.primitives.resources.impl;
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.REMOVE;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
import io.atomix.copycat.client.session.Session;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
......@@ -322,8 +322,8 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
commit.session()
.onStateChange(
state -> {
if (state == Session.State.CLOSED
|| state == Session.State.EXPIRED) {
if (state == ServerSession.State.CLOSED
|| state == ServerSession.State.EXPIRED) {
Commit<? extends Listen> listener = listeners.remove(sessionId);
if (listener != null) {
listener.close();
......@@ -503,21 +503,21 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements Se
}
@Override
public void register(Session session) {
public void register(ServerSession session) {
}
@Override
public void unregister(Session session) {
public void unregister(ServerSession session) {
closeListener(session.id());
}
@Override
public void expire(Session session) {
public void expire(ServerSession session) {
closeListener(session.id());
}
@Override
public void close(Session session) {
public void close(ServerSession session) {
closeListener(session.id());
}
......
......@@ -57,7 +57,7 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
client.session().onEvent("change", this::handleEvent);
client.onEvent("change", this::handleEvent);
return result;
});
}
......
......@@ -32,8 +32,8 @@ import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.client.Command;
import io.atomix.copycat.client.Query;
import io.atomix.copycat.Command;
import io.atomix.copycat.Query;
/**
* {@link AtomixLeaderElector} resource state machine operations.
......
......@@ -16,7 +16,7 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.client.session.Session;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
......@@ -265,7 +265,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
return electionState == null ? new LinkedList<>() : electionState.candidates();
}
private void onSessionEnd(Session session) {
private void onSessionEnd(ServerSession session) {
Commit<? extends AtomixLeaderElectorCommands.Listen> listener = listeners.remove(session.id());
if (listener != null) {
listener.close();
......@@ -337,7 +337,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
this.termStartTime = termStartTime;
}
public ElectionState cleanup(Session session, Supplier<Long> termCounter) {
public ElectionState cleanup(ServerSession session, Supplier<Long> termCounter) {
Optional<Registration> registration =
registrations.stream().filter(r -> r.sessionId() == session.id()).findFirst();
if (registration.isPresent()) {
......@@ -409,21 +409,21 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
@Override
public void register(Session session) {
public void register(ServerSession session) {
}
@Override
public void unregister(Session session) {
public void unregister(ServerSession session) {
onSessionEnd(session);
}
@Override
public void expire(Session session) {
public void expire(ServerSession session) {
onSessionEnd(session);
}
@Override
public void close(Session session) {
public void close(ServerSession session) {
onSessionEnd(session);
}
......
......@@ -79,7 +79,7 @@
<onos-build-conf.version>1.2-SNAPSHOT</onos-build-conf.version>
<netty4.version>4.0.33.Final</netty4.version>
<!-- TODO: replace with final release version when it is out -->
<atomix.version>1.0.0-rc1</atomix.version>
<atomix.version>1.0.0-rc2</atomix.version>
<copycat.version>0.5.1.onos</copycat.version>
<openflowj.version>0.9.1.onos</openflowj.version>
<onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>
......