Madan Jampani

Copycat messaging new happens over the same cluster messaging used for all other…

… ONOS p2p communication
......@@ -26,7 +26,7 @@ public interface DatabaseService {
/**
* Performs a write operation on the database.
* @param request
* @param request write request
* @return write result.
* @throws DatabaseException if there is failure in execution write.
*/
......
......@@ -17,15 +17,15 @@ public class ReadResult {
}
/**
* Database table name.
* @return
* Returns database table name.
* @return table name.
*/
public String tableName() {
return tableName;
}
/**
* Database table key.
* Returns database table key.
* @return key.
*/
public String key() {
......@@ -33,7 +33,7 @@ public class ReadResult {
}
/**
* value associated with the key.
* Returns value associated with the key.
* @return non-null value if the table contains one, null otherwise.
*/
public VersionedValue value() {
......
......@@ -12,8 +12,8 @@ public class VersionedValue {
/**
* Creates a new instance with the specified value and version.
* @param value
* @param version
* @param value value
* @param version version
*/
public VersionedValue(byte[] value, long version) {
this.value = value;
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
......@@ -27,6 +30,16 @@ import net.kuujo.copycat.spi.protocol.Protocol;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ImmutableListSerializer;
import org.onlab.onos.store.serializers.ImmutableMapSerializer;
import org.onlab.onos.store.serializers.ImmutableSetSerializer;
......@@ -37,6 +50,7 @@ import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteRequest;
import org.onlab.onos.store.service.WriteResult;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
......@@ -46,17 +60,44 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* {@link Protocol} based on {@link org.onlab.netty.NettyMessagingService}.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under
* the License.
*/
public class NettyProtocol implements Protocol<TcpMember> {
public static final String COPYCAT_PING = "copycat-raft-consensus-ping";
public static final String COPYCAT_SYNC = "copycat-raft-consensus-sync";
public static final String COPYCAT_POLL = "copycat-raft-consensus-poll";
public static final String COPYCAT_SUBMIT = "copycat-raft-consensus-submit";
@Component(immediate = true)
@Service
public class ClusterMessagingProtocol implements Protocol<TcpMember> {
private final Logger log = getLogger(getClass());
// TODO: make this configurable.
public static final long RETRY_INTERVAL_MILLIS = 2000;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterCommunicationService clusterCommunicator;
public static final MessageSubject COPYCAT_PING =
new MessageSubject("copycat-raft-consensus-ping");
public static final MessageSubject COPYCAT_SYNC =
new MessageSubject("copycat-raft-consensus-sync");
public static final MessageSubject COPYCAT_POLL =
new MessageSubject("copycat-raft-consensus-poll");
public static final MessageSubject COPYCAT_SUBMIT =
new MessageSubject("copycat-raft-consensus-submit");
private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
.register(PingRequest.class)
......@@ -76,8 +117,7 @@ public class NettyProtocol implements Protocol<TcpMember> {
.register(TcpMember.class)
.build();
// TODO: Move to the right place.
private static final KryoNamespace CRAFT = KryoNamespace.newBuilder()
private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
.register(ReadRequest.class)
.register(WriteRequest.class)
.register(InternalReadResult.class)
......@@ -116,31 +156,41 @@ public class NettyProtocol implements Protocol<TcpMember> {
serializerPool = KryoNamespace.newBuilder()
.register(COPYCAT)
.register(COMMON)
.register(CRAFT)
.register(DATABASE)
.build()
.populate(1);
}
};
private NettyProtocolServer server = null;
// FIXME: This is a total hack.Assumes
// ProtocolServer is initialized before ProtocolClient
protected NettyProtocolServer getServer() {
if (server == null) {
throw new IllegalStateException("ProtocolServer is not initialized yet!");
@Activate
public void activate() {
log.info("Started.");
}
return server;
@Deactivate
public void deactivate() {
log.info("Stopped.");
}
@Override
public ProtocolServer createServer(TcpMember member) {
server = new NettyProtocolServer(member);
return server;
return new ClusterMessagingProtocolServer(clusterCommunicator);
}
@Override
public ProtocolClient createClient(TcpMember member) {
return new NettyProtocolClient(this, member);
ControllerNode node = getControllerNode(member.host(), member.port());
checkNotNull(node, "A valid controller node is expected");
return new ClusterMessagingProtocolClient(
clusterCommunicator, node);
}
private ControllerNode getControllerNode(String host, int port) {
for (ControllerNode node : clusterService.getNodes()) {
if (node.ip().toString().equals(host) && node.tcpPort() == port) {
return node;
}
}
return null;
}
}
\ No newline at end of file
......
......@@ -11,7 +11,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
......@@ -22,37 +21,54 @@ import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.slf4j.Logger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* {@link NettyMessagingService} based Copycat protocol client.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under
* the License.
*/
public class NettyProtocolClient implements ProtocolClient {
public class ClusterMessagingProtocolClient implements ProtocolClient {
private final Logger log = getLogger(getClass());
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
// Remote endpoint, this client instance is used
// for communicating with.
private final Endpoint remoteEp;
private final NettyMessagingService messagingService;
public static final long RETRY_INTERVAL_MILLIS = 2000;
private final ClusterCommunicationService clusterCommunicator;
private final ControllerNode remoteNode;
// TODO: Is 10 the right number of threads?
// FIXME: Thread pool sizing.
private static final ScheduledExecutorService THREAD_POOL =
new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
public NettyProtocolClient(NettyProtocol protocol, TcpMember member) {
this(new Endpoint(member.host(), member.port()), protocol.getServer().getNettyMessagingService());
}
public NettyProtocolClient(Endpoint remoteEp, NettyMessagingService messagingService) {
this.remoteEp = remoteEp;
this.messagingService = messagingService;
public ClusterMessagingProtocolClient(
ClusterCommunicationService clusterCommunicator,
ControllerNode remoteNode) {
this.clusterCommunicator = clusterCommunicator;
this.remoteNode = remoteNode;
}
@Override
......@@ -85,16 +101,16 @@ public class NettyProtocolClient implements ProtocolClient {
return CompletableFuture.completedFuture(null);
}
public <I> String messageType(I input) {
public <I> MessageSubject messageType(I input) {
Class<?> clazz = input.getClass();
if (clazz.equals(PollRequest.class)) {
return NettyProtocol.COPYCAT_POLL;
return ClusterMessagingProtocol.COPYCAT_POLL;
} else if (clazz.equals(SyncRequest.class)) {
return NettyProtocol.COPYCAT_SYNC;
return ClusterMessagingProtocol.COPYCAT_SYNC;
} else if (clazz.equals(SubmitRequest.class)) {
return NettyProtocol.COPYCAT_SUBMIT;
return ClusterMessagingProtocol.COPYCAT_SUBMIT;
} else if (clazz.equals(PingRequest.class)) {
return NettyProtocol.COPYCAT_PING;
return ClusterMessagingProtocol.COPYCAT_PING;
} else {
throw new IllegalArgumentException("Unknown class " + clazz.getName());
}
......@@ -109,33 +125,34 @@ public class NettyProtocolClient implements ProtocolClient {
private class RPCTask<I, O> implements Runnable {
private final String messageType;
private final byte[] payload;
private final ClusterMessage message;
private final CompletableFuture<O> future;
public RPCTask(I request, CompletableFuture<O> future) {
this.messageType = messageType(request);
this.payload = NettyProtocol.SERIALIZER.encode(request);
this.message =
new ClusterMessage(
null,
messageType(request),
ClusterMessagingProtocol.SERIALIZER.encode(request));
this.future = future;
}
@Override
public void run() {
try {
byte[] response = messagingService
.sendAndReceive(remoteEp, messageType, payload)
.get(NettyProtocol.RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
future.complete(NettyProtocol.SERIALIZER.decode(response));
byte[] response = clusterCommunicator
.sendAndReceive(message, remoteNode.id())
.get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
if (messageType.equals(NettyProtocol.COPYCAT_SYNC) ||
messageType.equals(NettyProtocol.COPYCAT_PING)) {
if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
log.warn("Request to {} failed. Will retry "
+ "in {} ms", remoteEp, NettyProtocol.RETRY_INTERVAL_MILLIS);
+ "in {} ms", remoteNode, RETRY_INTERVAL_MILLIS);
THREAD_POOL.schedule(
this,
NettyProtocol.RETRY_INTERVAL_MILLIS,
RETRY_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS);
} else {
future.completeExceptionally(e);
......
......@@ -2,10 +2,8 @@ package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.RequestHandler;
......@@ -13,33 +11,45 @@ import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.NettyMessagingService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.slf4j.Logger;
/**
* {@link NettyMessagingService} based Copycat protocol server.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under
* the License.
*/
public class NettyProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
public class ClusterMessagingProtocolServer implements ProtocolServer {
private final NettyMessagingService messagingService;
private final Logger log = getLogger(getClass());
private RequestHandler handler;
public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
public NettyProtocolServer(TcpMember member) {
messagingService = new NettyMessagingService(member.host(), member.port());
messagingService.registerHandler(NettyProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
messagingService.registerHandler(NettyProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
}
protected NettyMessagingService getNettyMessagingService() {
return messagingService;
clusterCommunicator.addSubscriber(
ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
clusterCommunicator.addSubscriber(
ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
clusterCommunicator.addSubscriber(
ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
clusterCommunicator.addSubscriber(
ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
}
@Override
......@@ -49,38 +59,23 @@ public class NettyProtocolServer implements ProtocolServer {
@Override
public CompletableFuture<Void> listen() {
try {
messagingService.activate();
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}
@Override
public CompletableFuture<Void> close() {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
messagingService.deactivate();
future.complete(null);
return future;
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
return CompletableFuture.completedFuture(null);
}
private class CopycatMessageHandler<T> implements MessageHandler {
private class CopycatMessageHandler<T> implements ClusterMessageHandler {
@Override
public void handle(Message message) throws IOException {
T request = NettyProtocol.SERIALIZER.decode(message.payload());
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
if (request.getClass().equals(PingRequest.class)) {
handler.ping((PingRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to ping request", e);
}
......@@ -88,7 +83,7 @@ public class NettyProtocolServer implements ProtocolServer {
} else if (request.getClass().equals(PollRequest.class)) {
handler.poll((PollRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to poll request", e);
}
......@@ -96,7 +91,7 @@ public class NettyProtocolServer implements ProtocolServer {
} else if (request.getClass().equals(SyncRequest.class)) {
handler.sync((SyncRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to sync request", e);
}
......@@ -104,7 +99,7 @@ public class NettyProtocolServer implements ProtocolServer {
} else if (request.getClass().equals(SubmitRequest.class)) {
handler.submit((SubmitRequest) request).whenComplete((response, error) -> {
try {
message.respond(NettyProtocol.SERIALIZER.encode(response));
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to submit request", e);
}
......
......@@ -11,37 +11,22 @@ import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.apache.commons.lang3.RandomUtils;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.ReadRequest;
import org.onlab.onos.store.service.WriteRequest;
public class DatabaseClient {
private final Endpoint copycatEp;
ProtocolClient client;
NettyMessagingService messagingService;
private final ProtocolClient client;
public DatabaseClient(Endpoint copycatEp) {
this.copycatEp = copycatEp;
public DatabaseClient(ProtocolClient client) {
this.client = client;
}
private static String nextId() {
return UUID.randomUUID().toString();
}
public void activate() throws Exception {
messagingService = new NettyMessagingService(RandomUtils.nextInt(10000, 40000));
messagingService.activate();
client = new NettyProtocolClient(copycatEp, messagingService);
}
public void deactivate() throws Exception {
messagingService.deactivate();
}
public boolean createTable(String tableName) {
SubmitRequest request =
......
......@@ -18,7 +18,6 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.netty.Endpoint;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.service.DatabaseAdminService;
......@@ -50,6 +49,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
ClusterMessagingProtocol copycatMessagingProtocol;
public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
private Copycat copycat;
......@@ -57,15 +59,14 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
@Activate
public void activate() {
// FIXME hack tcpPort +1 for copycat communication
TcpMember localMember =
new TcpMember(
clusterService.getLocalNode().ip().toString(),
clusterService.getLocalNode().tcpPort() + 1);
clusterService.getLocalNode().tcpPort());
List<TcpMember> remoteMembers = Lists.newArrayList();
for (ControllerNode node : clusterService.getNodes()) {
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort() + 1);
TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort());
if (!member.equals(localMember)) {
remoteMembers.add(member);
}
......@@ -84,10 +85,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
ControllerNode thisNode = clusterService.getLocalNode();
Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
copycat = new Copycat(stateMachine, consensusLog, cluster, new NettyProtocol());
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.start();
client = new DatabaseClient(new Endpoint(localMember.host(), localMember.port()));
client = new DatabaseClient(copycatMessagingProtocol.createClient(localMember));
log.info("Started.");
}
......
......@@ -27,7 +27,7 @@ public class DatabaseStateMachine implements StateMachine {
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
.register(NettyProtocol.COMMON)
.register(ClusterMessagingProtocol.COMMON)
.build()
.populate(1);
}
......