Madan Jampani
Committed by Gerrit Code Review

Copycat transport enhancements

Change-Id: I50e9eb0f419b2aa10deff6d54f58649688788faa
......@@ -15,29 +15,19 @@
*/
package org.onosproject.store.primitives.impl;
import io.atomix.catalyst.serializer.CatalystSerializable;
import java.util.Arrays;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.TypeSerializerFactory;
import io.atomix.copycat.client.Query;
import io.atomix.manager.state.GetResource;
import io.atomix.manager.state.GetResourceKeys;
import io.atomix.resource.ResourceQuery;
import io.atomix.variables.state.ValueCommands;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.Scanner;
import org.onlab.util.Match;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
......@@ -47,7 +37,6 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
/**
......@@ -65,113 +54,31 @@ public final class CatalystSerializers {
org.onosproject.store.service.Serializer.using(Arrays.asList((KryoNamespaces.API)),
MapEntryUpdateResult.class,
MapEntryUpdateResult.Status.class,
MapUpdate.class,
MapUpdate.Type.class,
MapTransaction.class,
Transaction.State.class,
TransactionId.class,
PrepareResult.class,
CommitResult.class,
RollbackResult.class,
AtomixConsistentMapCommands.Get.class,
AtomixConsistentMapCommands.ContainsKey.class,
AtomixConsistentMapCommands.ContainsValue.class,
AtomixConsistentMapCommands.Size.class,
AtomixConsistentMapCommands.IsEmpty.class,
AtomixConsistentMapCommands.KeySet.class,
AtomixConsistentMapCommands.EntrySet.class,
AtomixConsistentMapCommands.Values.class,
AtomixConsistentMapCommands.UpdateAndGet.class,
AtomixConsistentMapCommands.TransactionPrepare.class,
AtomixConsistentMapCommands.TransactionCommit.class,
AtomixConsistentMapCommands.TransactionRollback.class,
AtomixLeaderElectorCommands.GetLeadership.class,
AtomixLeaderElectorCommands.GetAllLeaderships.class,
AtomixLeaderElectorCommands.GetElectedTopics.class,
AtomixLeaderElectorCommands.Run.class,
AtomixLeaderElectorCommands.Withdraw.class,
AtomixLeaderElectorCommands.Anoint.class,
GetResource.class,
GetResourceKeys.class,
ResourceQuery.class,
ValueCommands.Get.class,
ValueCommands.Set.class,
Query.ConsistencyLevel.class));
// ONOS classes
serializer.register(Change.class, factory);
serializer.register(Leader.class, factory);
serializer.register(Leadership.class, factory);
serializer.register(NodeId.class, factory);
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
serializer.register(MapTransaction.class, factory);
serializer.register(Transaction.State.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
serializer.register(RollbackResult.class, factory);
serializer.register(TransactionId.class, factory);
serializer.register(MapUpdate.class, factory);
serializer.register(MapUpdate.Type.class, factory);
serializer.register(MapTransaction.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.register(AtomixConsistentMapState.class, factory);
serializer.register(ResourceQuery.class, factory);
serializer.register(GetResource.class, factory);
serializer.register(GetResourceKeys.class, factory);
serializer.register(ValueCommands.Get.class, factory);
serializer.register(ValueCommands.Set.class, factory);
// ConsistentMap
serializer.register(AtomixConsistentMapCommands.UpdateAndGet.class, factory);
serializer.register(AtomixConsistentMapCommands.Clear.class);
serializer.register(AtomixConsistentMapCommands.Listen.class);
serializer.register(AtomixConsistentMapCommands.Unlisten.class);
serializer.register(AtomixConsistentMapCommands.Get.class);
serializer.register(AtomixConsistentMapCommands.ContainsKey.class);
serializer.register(AtomixConsistentMapCommands.ContainsValue.class);
serializer.register(AtomixConsistentMapCommands.EntrySet.class);
serializer.register(AtomixConsistentMapCommands.IsEmpty.class);
serializer.register(AtomixConsistentMapCommands.KeySet.class);
serializer.register(AtomixConsistentMapCommands.Size.class);
serializer.register(AtomixConsistentMapCommands.Values.class);
serializer.register(AtomixConsistentMapCommands.TransactionPrepare.class);
serializer.register(AtomixConsistentMapCommands.TransactionCommit.class);
serializer.register(AtomixConsistentMapCommands.TransactionRollback.class);
// LeaderElector
serializer.register(AtomixLeaderElectorCommands.Run.class, factory);
serializer.register(AtomixLeaderElectorCommands.Withdraw.class, factory);
serializer.register(AtomixLeaderElectorCommands.Anoint.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetElectedTopics.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetLeadership.class, factory);
serializer.register(AtomixLeaderElectorCommands.GetAllLeaderships.class, factory);
serializer.register(AtomixLeaderElectorCommands.Listen.class);
serializer.register(AtomixLeaderElectorCommands.Unlisten.class);
// Atomix types
try {
ClassLoader cl = CatalystSerializable.class.getClassLoader();
Enumeration<URL> urls = cl.getResources(
String.format("META-INF/services/%s", CatalystSerializable.class.getName()));
while (urls.hasMoreElements()) {
URL url = urls.nextElement();
try (Scanner scanner = new Scanner(url.openStream(), "UTF-8")) {
scanner.useDelimiter("\n").forEachRemaining(line -> {
if (!line.trim().startsWith("#")) {
line = line.trim();
if (line.length() > 0) {
try {
serializer.register(cl.loadClass(line));
} catch (ClassNotFoundException e) {
Throwables.propagate(e);
}
}
}
});
}
}
} catch (IOException e) {
Throwables.propagate(e);
}
return serializer;
}
}
......
......@@ -17,9 +17,20 @@ package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.transport.Transport;
......@@ -51,6 +62,8 @@ public class CopycatTransport implements Transport {
private final Mode mode;
private final PartitionId partitionId;
private final MessagingService messagingService;
private static final Map<Address, Endpoint> EP_LOOKUP_CACHE = Maps.newConcurrentMap();
private static final Map<Endpoint, Address> ADDRESS_LOOKUP_CACHE = Maps.newConcurrentMap();
public CopycatTransport(Mode mode, PartitionId partitionId, MessagingService messagingService) {
this.mode = checkNotNull(mode);
......@@ -70,4 +83,42 @@ public class CopycatTransport implements Transport {
return new CopycatTransportServer(partitionId,
messagingService);
}
/**
* Maps {@link Address address} to {@link Endpoint endpoint}.
* @param address
* @return end point
*/
public static Endpoint toEndpoint(Address address) {
return EP_LOOKUP_CACHE.computeIfAbsent(address, a -> {
try {
Endpoint endpoint = new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
ADDRESS_LOOKUP_CACHE.putIfAbsent(endpoint, address);
return endpoint;
} catch (UnknownHostException e) {
Throwables.propagate(e);
return null;
}
});
}
/**
* Maps {@link Endpoint endpoint} to {@link Address address}.
* @param endpoint end point
* @return address
*/
public static Address toAddress(Endpoint endpoint) {
return ADDRESS_LOOKUP_CACHE.computeIfAbsent(endpoint, ep -> {
try {
InetAddress host = InetAddress.getByAddress(endpoint.host().toOctets());
int port = endpoint.port();
Address address = new Address(new InetSocketAddress(host, port));
EP_LOOKUP_CACHE.putIfAbsent(address, endpoint);
return address;
} catch (UnknownHostException e) {
Throwables.propagate(e);
return null;
}
});
}
}
......
......@@ -50,18 +50,23 @@ public class CopycatTransportClient implements Client {
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
ThreadContext context = ThreadContext.currentContextOrThrow();
CopycatTransportConnection connection = new CopycatTransportConnection(
nextConnectionId(),
CopycatTransport.Mode.CLIENT,
partitionId,
remoteAddress,
messagingService,
context);
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
connections.add(connection);
return CompletableFuture.supplyAsync(() -> connection, context.executor());
return messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
PartitionManager.HELLO_MESSAGE_SUBJECT,
"hello".getBytes())
.thenApplyAsync(r -> {
CopycatTransportConnection connection = new CopycatTransportConnection(
nextConnectionId(),
CopycatTransport.Mode.CLIENT,
partitionId,
remoteAddress,
messagingService,
context);
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
connections.add(connection);
return connection;
}, context.executor());
}
@Override
......
......@@ -21,8 +21,6 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
......@@ -30,10 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.MoreObjects;
......@@ -74,7 +70,6 @@ public class CopycatTransportConnection implements Connection {
private final AtomicInteger sendFailures = new AtomicInteger(0);
private final AtomicInteger messagesReceived = new AtomicInteger(0);
private final AtomicInteger receiveFailures = new AtomicInteger(0);
private final Map<Address, Endpoint> endpointLookupCache = Maps.newConcurrentMap();
CopycatTransportConnection(long connectionId,
CopycatTransport.Mode mode,
......@@ -120,7 +115,7 @@ public class CopycatTransportConnection implements Connection {
if (message instanceof ReferenceCounted) {
((ReferenceCounted<?>) message).release();
}
messagingService.sendAndReceive(toEndpoint(remoteAddress),
messagingService.sendAndReceive(CopycatTransport.toEndpoint(remoteAddress),
outboundMessageSubject,
baos.toByteArray(),
context.executor())
......@@ -240,17 +235,6 @@ public class CopycatTransportConnection implements Connection {
.toString();
}
private Endpoint toEndpoint(Address address) {
return endpointLookupCache.computeIfAbsent(address, a -> {
try {
return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
} catch (UnknownHostException e) {
Throwables.propagate(e);
return null;
}
});
}
@SuppressWarnings("rawtypes")
private final class InternalHandler {
......
......@@ -16,13 +16,11 @@
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -32,8 +30,8 @@ import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
......@@ -47,6 +45,7 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
*/
public class CopycatTransportServer implements Server {
private final Logger log = getLogger(getClass());
private final AtomicBoolean listening = new AtomicBoolean(false);
private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
private final PartitionId partitionId;
......@@ -73,28 +72,23 @@ public class CopycatTransportServer implements Server {
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
AtomicBoolean newConnection = new AtomicBoolean(false);
AtomicBoolean newConnectionCreated = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
newConnection.set(true);
try {
InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
int senderPort = sender.port();
Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
return new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
senderAddress,
messagingService,
getOrCreateContext(context));
} catch (UnknownHostException e) {
Throwables.propagate(e);
return null;
}
newConnectionCreated.set(true);
CopycatTransportConnection newConnection = new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
partitionId,
CopycatTransport.toAddress(sender),
messagingService,
getOrCreateContext(context));
log.debug("Created new incoming connection {}", connectionId);
newConnection.closeListener(c -> connections.remove(connectionId, c));
return newConnection;
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
() -> {
if (newConnection.get()) {
if (newConnectionCreated.get()) {
listener.accept(connection);
}
return connection;
......
......@@ -57,6 +57,7 @@ import com.google.common.collect.Maps;
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
public static final String HELLO_MESSAGE_SUBJECT = "partition-manager-hello";
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -72,6 +73,8 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
@Activate
public void activate() {
messagingService.registerHandler(HELLO_MESSAGE_SUBJECT,
(ep, input) -> CompletableFuture.completedFuture(input));
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
metadataService.getClusterMetadata()
......@@ -92,6 +95,7 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
}
public void deactivate() {
messagingService.unregisterHandler(HELLO_MESSAGE_SUBJECT);
eventDispatcher.removeSink(PartitionEvent.class);
CompletableFuture<Void> closeFuture = CompletableFuture.allOf(partitions.values()
......@@ -151,4 +155,4 @@ public class PartitionManager extends AbstractListenerManager<PartitionEvent, Pa
.map(Optional::get)
.collect(Collectors.toList());
}
}
\ No newline at end of file
}
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.store.primitives.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.catalyst.transport.Transport;
......@@ -38,6 +39,7 @@ import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
......@@ -48,6 +50,8 @@ import com.google.common.collect.ImmutableSet;
*/
public class StoragePartitionClient implements DistributedPrimitiveCreator, Managed<StoragePartitionClient> {
private final Logger log = getLogger(getClass());
private final StoragePartition partition;
private final Transport transport;
private final io.atomix.catalyst.serializer.Serializer serializer;
......@@ -82,7 +86,13 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
.withTransport(transport)
.build();
}
return client.open().thenApply(v -> null);
return client.open().whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully started client for partition {}", partition.getId());
} else {
log.info("Failed to start client for partition {}", partition.getId(), e);
}
}).thenApply(v -> null);
}
@Override
......@@ -156,4 +166,4 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
public boolean isClosed() {
return client.isClosed();
}
}
\ No newline at end of file
}
......