Madan Jampani
Committed by Madan Jampani

Added custom transport implementaion (for Catalyst Transport) for all copycat specific communication

Change-Id: I801d973b7c3412f6a8efcec77fe73fc480b2ce6e
......@@ -32,6 +32,7 @@
<description>ONOS Core Store subsystem</description>
<modules>
<module>primitives</module>
<module>dist</module>
<module>persistence</module>
<module>serializers</module>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-store</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-core-primitives</artifactId>
<packaging>bundle</packaging>
<description>ONOS distributed state management primitives</description>
<dependencies>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<!-- for shaded atomix/copycat -->
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-thirdparty</artifactId>
</dependency>
</dependencies>
</project>
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onosproject.store.cluster.messaging.MessagingService;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.transport.Transport;
/**
* Custom {@link Transport transport} for Copycat interactions
* built on top of {@link MessagingService}.
*
* @see CopycatTransportServer
* @see CopycatTransportClient
*/
public class CopycatTransport implements Transport {
/**
* Transport Mode.
*/
public enum Mode {
/**
* Signifies transport for client -> server interaction.
*/
CLIENT,
/**
* Signified transport for server -> server interaction.
*/
SERVER
}
private final Mode mode;
private final String clusterName;
private final MessagingService messagingService;
public CopycatTransport(Mode mode, String clusterName, MessagingService messagingService) {
this.mode = checkNotNull(mode);
this.clusterName = checkNotNull(clusterName);
this.messagingService = checkNotNull(messagingService);
}
@Override
public Client client() {
return new CopycatTransportClient(clusterName,
messagingService,
mode);
}
@Override
public Server server() {
return new CopycatTransportServer(clusterName,
messagingService);
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang.math.RandomUtils;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.collect.Sets;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.concurrent.ThreadContext;
/**
* {@link Client} implementation for {@link CopycatTransport}.
*/
public class CopycatTransportClient implements Client {
private final String clusterName;
private final MessagingService messagingService;
private final CopycatTransport.Mode mode;
private final ThreadContext context;
private final Set<CopycatTransportConnection> connections = Sets.newConcurrentHashSet();
CopycatTransportClient(String clusterName, MessagingService messagingService, CopycatTransport.Mode mode) {
this.clusterName = checkNotNull(clusterName);
this.messagingService = checkNotNull(messagingService);
this.mode = checkNotNull(mode);
this.context = ThreadContext.currentContextOrThrow();
}
@Override
public CompletableFuture<Connection> connect(Address remoteAddress) {
CopycatTransportConnection connection = new CopycatTransportConnection(
nextConnectionId(),
CopycatTransport.Mode.CLIENT,
clusterName,
remoteAddress,
messagingService,
context);
if (mode == CopycatTransport.Mode.CLIENT) {
connection.setBidirectional();
}
connections.add(connection);
return CompletableFuture.supplyAsync(() -> connection, context.executor());
}
@Override
public CompletableFuture<Void> close() {
return CompletableFuture.allOf(connections.stream().map(Connection::close).toArray(CompletableFuture[]::new));
}
private long nextConnectionId() {
return RandomUtils.nextLong();
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.packet.IpAddress;
import org.onlab.util.Tools;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.MessageHandler;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.catalyst.util.Listener;
import io.atomix.catalyst.util.Listeners;
import io.atomix.catalyst.util.ReferenceCounted;
import io.atomix.catalyst.util.concurrent.ThreadContext;
/**
* {@link Connection} implementation for CopycatTransport.
*/
public class CopycatTransportConnection implements Connection {
private final Listeners<Throwable> exceptionListeners = new Listeners<>();
private final Listeners<Connection> closeListeners = new Listeners<>();
static final byte SUCCESS = 0x03;
static final byte FAILURE = 0x04;
private final long connectionId;
private CopycatTransport.Mode mode;
private final Address remoteAddress;
private final MessagingService messagingService;
private final String outboundMessageSubject;
private final String inboundMessageSubject;
private final ThreadContext context;
private final Map<Class<?>, InternalHandler> handlers = Maps.newConcurrentMap();
private final AtomicInteger messagesSent = new AtomicInteger(0);
private final AtomicInteger sendFailures = new AtomicInteger(0);
private final AtomicInteger messagesReceived = new AtomicInteger(0);
private final AtomicInteger receiveFailures = new AtomicInteger(0);
CopycatTransportConnection(long connectionId,
CopycatTransport.Mode mode,
String clusterName,
Address address,
MessagingService messagingService,
ThreadContext context) {
this.connectionId = connectionId;
this.mode = checkNotNull(mode);
this.remoteAddress = checkNotNull(address);
this.messagingService = checkNotNull(messagingService);
if (mode == CopycatTransport.Mode.CLIENT) {
this.outboundMessageSubject = String.format("onos-copycat-%s", clusterName);
this.inboundMessageSubject = String.format("onos-copycat-%s-%d", clusterName, connectionId);
} else {
this.outboundMessageSubject = String.format("onos-copycat-%s-%d", clusterName, connectionId);
this.inboundMessageSubject = String.format("onos-copycat-%s", clusterName);
}
this.context = checkNotNull(context);
}
public void setBidirectional() {
messagingService.registerHandler(inboundMessageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
if (input.readLong() != connectionId) {
throw new IllegalStateException("Invalid connection Id");
}
return handle(IOUtils.toByteArray(input));
} catch (IOException e) {
Throwables.propagate(e);
return null;
}
});
}
@Override
public <T, U> CompletableFuture<U> send(T message) {
ThreadContext context = ThreadContext.currentContextOrThrow();
CompletableFuture<U> result = new CompletableFuture<>();
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
new DataOutputStream(baos).writeLong(connectionId);
context.serializer().writeObject(message, baos);
if (message instanceof ReferenceCounted) {
((ReferenceCounted<?>) message).release();
}
messagingService.sendAndReceive(toEndpoint(remoteAddress),
outboundMessageSubject,
baos.toByteArray(),
context.executor())
.whenComplete((r, e) -> {
if (e == null) {
messagesSent.incrementAndGet();
} else {
sendFailures.incrementAndGet();
}
handleResponse(r, e, result, context);
});
} catch (Exception e) {
result.completeExceptionally(new TransportException("Failed to send request", e));
}
return result;
}
private <T> void handleResponse(byte[] response,
Throwable error,
CompletableFuture<T> future,
ThreadContext context) {
if (error != null) {
context.execute(() -> future.completeExceptionally(error));
return;
}
checkNotNull(response);
InputStream input = new ByteArrayInputStream(response);
try {
byte status = (byte) input.read();
if (status == FAILURE) {
Throwable t = context.serializer().readObject(input);
context.execute(() -> future.completeExceptionally(t));
} else {
context.execute(() -> future.complete(context.serializer().readObject(input)));
}
} catch (IOException e) {
context.execute(() -> future.completeExceptionally(e));
}
}
@Override
public <T, U> Connection handler(Class<T> type, MessageHandler<T, U> handler) {
Assert.notNull(type, "type");
handlers.put(type, new InternalHandler(handler, ThreadContext.currentContextOrThrow()));
return null;
}
public CompletableFuture<byte[]> handle(byte[] message) {
try {
Object request = context.serializer().readObject(new ByteArrayInputStream(message));
InternalHandler handler = handlers.get(request.getClass());
if (handler == null) {
return Tools.exceptionalFuture(new IllegalStateException(
"No handler registered for " + request.getClass()));
}
return handler.handle(request).handle((result, error) -> {
if (error == null) {
messagesReceived.incrementAndGet();
} else {
receiveFailures.incrementAndGet();
}
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
baos.write(error != null ? FAILURE : SUCCESS);
context.serializer().writeObject(error != null ? error : result, baos);
return baos.toByteArray();
} catch (IOException e) {
Throwables.propagate(e);
return null;
}
});
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override
public Listener<Throwable> exceptionListener(Consumer<Throwable> listener) {
return exceptionListeners.add(listener);
}
@Override
public Listener<Connection> closeListener(Consumer<Connection> listener) {
return closeListeners.add(listener);
}
@Override
public CompletableFuture<Void> close() {
// TODO: need to unregister message handler
closeListeners.forEach(listener -> listener.accept(this));
if (mode == CopycatTransport.Mode.CLIENT) {
messagingService.unregisterHandler(inboundMessageSubject);
}
return CompletableFuture.completedFuture(null);
}
@Override
public int hashCode() {
return Objects.hash(connectionId);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof CopycatTransportConnection)) {
return false;
}
return connectionId == ((CopycatTransportConnection) other).connectionId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", connectionId)
.add("sent", messagesSent.get())
.add("received", messagesReceived.get())
.add("sendFailures", sendFailures.get())
.add("receiveFailures", receiveFailures.get())
.toString();
}
private Endpoint toEndpoint(Address address) {
try {
return new Endpoint(IpAddress.valueOf(InetAddress.getByName(address.host())), address.port());
} catch (UnknownHostException e) {
Throwables.propagate(e);
return null;
}
}
@SuppressWarnings("rawtypes")
private final class InternalHandler {
private final MessageHandler handler;
private final ThreadContext context;
private InternalHandler(MessageHandler handler, ThreadContext context) {
this.handler = handler;
this.context = context;
}
@SuppressWarnings("unchecked")
public CompletableFuture<Object> handle(Object message) {
CompletableFuture<Object> answer = new CompletableFuture<>();
context.execute(() -> handler.handle(message).whenComplete((r, e) -> {
if (e != null) {
answer.completeExceptionally((Throwable) e);
} else {
answer.complete(r);
}
}));
return answer;
}
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.onlab.util.Tools;
import org.onosproject.store.cluster.messaging.MessagingService;
import com.google.common.collect.Maps;
import io.atomix.catalyst.transport.Address;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.Server;
import io.atomix.catalyst.util.concurrent.SingleThreadContext;
import io.atomix.catalyst.util.concurrent.ThreadContext;
/**
* {@link Server} implementation for {@link CopycatTransport}.
*/
public class CopycatTransportServer implements Server {
private final AtomicBoolean listening = new AtomicBoolean(false);
private CompletableFuture<Void> listenFuture;
private final String clusterName;
private final MessagingService messagingService;
private final String messageSubject;
private final Map<Long, CopycatTransportConnection> connections = Maps.newConcurrentMap();
CopycatTransportServer(String clusterName, MessagingService messagingService) {
this.clusterName = checkNotNull(clusterName);
this.messagingService = checkNotNull(messagingService);
this.messageSubject = String.format("onos-copycat-%s", clusterName);
}
@Override
public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
if (listening.get()) {
return CompletableFuture.completedFuture(null);
}
ThreadContext context = ThreadContext.currentContextOrThrow();
synchronized (this) {
if (listenFuture == null) {
listenFuture = new CompletableFuture<>();
listen(address, listener, context);
}
}
return listenFuture;
}
public void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
messagingService.registerHandler(messageSubject, (sender, payload) -> {
try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
long connectionId = input.readLong();
InetAddress senderHost = InetAddress.getByAddress(sender.host().toOctets());
int senderPort = sender.port();
Address senderAddress = new Address(new InetSocketAddress(senderHost, senderPort));
AtomicBoolean newConnection = new AtomicBoolean(false);
CopycatTransportConnection connection = connections.computeIfAbsent(connectionId, k -> {
newConnection.set(true);
return new CopycatTransportConnection(connectionId,
CopycatTransport.Mode.SERVER,
clusterName,
senderAddress,
messagingService,
getOrCreateContext(context));
});
byte[] request = IOUtils.toByteArray(input);
return CompletableFuture.supplyAsync(
() -> {
if (newConnection.get()) {
listener.accept(connection);
}
return connection;
}, context.executor()).thenCompose(c -> c.handle(request));
} catch (IOException e) {
return Tools.exceptionalFuture(e);
}
});
listening.set(true);
context.execute(() -> {
listenFuture.complete(null);
});
}
@Override
public CompletableFuture<Void> close() {
messagingService.unregisterHandler(messageSubject);
return CompletableFuture.completedFuture(null);
}
/**
* Returns the current execution context or creates one.
*/
private ThreadContext getOrCreateContext(ThreadContext parentContext) {
ThreadContext context = ThreadContext.currentContext();
if (context != null) {
return context;
}
return new SingleThreadContext("copycat-transport-server-" + clusterName, parentContext.serializer().clone());
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Implementation for distributed state management primitives.
*/
package org.onosproject.store.primitives.impl;
......@@ -77,6 +77,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<onos-build-conf.version>1.1</onos-build-conf.version>
<netty4.version>4.0.33.Final</netty4.version>
<!-- TODO: replace with final release version when it is out -->
<atomix.version>0.1.0-beta4</atomix.version>
<copycat.version>0.5.1.onos</copycat.version>
<openflowj.version>0.9.1.onos</openflowj.version>
<onos-maven-plugin.version>1.8-SNAPSHOT</onos-maven-plugin.version>
......@@ -87,6 +89,7 @@
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<slf4j.version>1.7.6</slf4j.version>
<guava.version>19.0</guava.version>
<commons.io.version>2.4</commons.io.version>
<!-- TODO argLine was originally added maven-surfire-plugin configuration
to fix locale errors for non-US developers. However, it breaks
SonarQube's test coverage, so moving here for now. -->
......
......@@ -38,6 +38,18 @@
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
<version>${atomix.version}</version>
</dependency>
<dependency>
<!-- FIXME once fixes get merged to upstream -->
<groupId>org.onosproject</groupId>
<artifactId>copycat-api</artifactId>
......@@ -57,6 +69,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<configuration>
<createSourcesJar>true</createSourcesJar>
......@@ -81,13 +94,35 @@
</filter>
<filter>
<artifact>commons-io:commons-io</artifact>
<includes>
<include>org/apache/commons/io/**</include>
</includes>
</filter>
<filter>
<artifact>org.onosproject:copycat*</artifact>
<includes>
<include>**</include>
</includes>
</filter>
<filter>
<artifact>io.atomix:atomix-all</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/io.atomix.catalyst.serializer.CatalystSerializable</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/io.atomix.resource.Resource</resource>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
......@@ -104,8 +139,11 @@
<configuration>
<instructions>
<Export-Package>
com.googlecode.concurrenttrees.*;net.kuujo.copycat.*
com.googlecode.concurrenttrees.*;net.kuujo.copycat.*;io.atomix.*
</Export-Package>
<Import-Package>
!sun.nio.ch,!sun.misc,*
</Import-Package>
</instructions>
</configuration>
</plugin>
......