Madan Jampani
Committed by Gerrit Code Review

Bypass netty stack for messages that are sent to self

Change-Id: Ifb1fd610892bd22a291cda472a8a5ef7a1dcfe6d

Manual serde for ClusterMessage to avoid one additional kryo serialization overhead for each message sent/received

Change-Id: I08d9a2c10403b0e9e9e1736c6bd36fa008bb8db0
......@@ -15,14 +15,17 @@
*/
package org.onosproject.store.cluster.messaging;
import com.google.common.base.MoreObjects;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.cluster.NodeId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.cluster.NodeId;
import com.google.common.base.Charsets;
import com.google.common.base.MoreObjects;
// TODO: Should payload type be ByteBuffer?
/**
* Base message for cluster-wide communications.
......@@ -105,6 +108,43 @@ public class ClusterMessage {
Arrays.equals(this.payload, that.payload);
}
/**
* Serializes this instance.
* @return bytes
*/
public byte[] getBytes() {
byte[] senderBytes = sender.toString().getBytes(Charsets.UTF_8);
byte[] subjectBytes = subject.value().getBytes(Charsets.UTF_8);
int capacity = 12 + senderBytes.length + subjectBytes.length + payload.length;
ByteBuffer buffer = ByteBuffer.allocate(capacity);
buffer.putInt(senderBytes.length);
buffer.put(senderBytes);
buffer.putInt(subjectBytes.length);
buffer.put(subjectBytes);
buffer.putInt(payload.length);
buffer.put(payload);
return buffer.array();
}
/**
* Decodes a new ClusterMessage from raw bytes.
* @param bytes raw bytes
* @return cluster message
*/
public static ClusterMessage fromBytes(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
byte[] senderBytes = new byte[buffer.getInt()];
buffer.get(senderBytes);
byte[] subjectBytes = new byte[buffer.getInt()];
buffer.get(subjectBytes);
byte[] payloadBytes = new byte[buffer.getInt()];
buffer.get(payloadBytes);
return new ClusterMessage(new NodeId(new String(senderBytes, Charsets.UTF_8)),
new MessageSubject(new String(senderBytes, Charsets.UTF_8)),
payloadBytes);
}
@Override
public int hashCode() {
return Objects.hash(sender, subject, payload);
......
......@@ -28,7 +28,6 @@ import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -36,10 +35,6 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.impl.ClusterMessageSerializer;
import org.onosproject.store.serializers.impl.MessageSubjectSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -62,19 +57,6 @@ public class ClusterCommunicationManager
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(new ClusterMessageSerializer(), ClusterMessage.class)
.register(new MessageSubjectSerializer(), MessageSubject.class)
.build();
}
};
@Activate
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
......@@ -105,7 +87,7 @@ public class ClusterCommunicationManager
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
byte[] payload = SERIALIZER.encode(message);
byte[] payload = message.getBytes();
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
......@@ -117,7 +99,7 @@ public class ClusterCommunicationManager
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) {
boolean ok = true;
byte[] payload = SERIALIZER.encode(message);
byte[] payload = message.getBytes();
for (ControllerNode node : clusterService.getNodes()) {
ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
}
......@@ -128,7 +110,7 @@ public class ClusterCommunicationManager
public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
byte[] payload = SERIALIZER.encode(message);
byte[] payload = message.getBytes();
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
......@@ -139,7 +121,7 @@ public class ClusterCommunicationManager
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
return unicast(message.subject(), SERIALIZER.encode(message), toNodeId);
return unicast(message.subject(), message.getBytes(), toNodeId);
}
private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
......@@ -170,7 +152,7 @@ public class ClusterCommunicationManager
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
} catch (IOException e) {
log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
......@@ -209,7 +191,7 @@ public class ClusterCommunicationManager
public void handle(Message message) {
final ClusterMessage clusterMessage;
try {
clusterMessage = SERIALIZER.decode(message.payload());
clusterMessage = ClusterMessage.fromBytes(message.payload());
} catch (Exception e) {
log.error("Failed decoding {}", message, e);
throw e;
......
......@@ -87,9 +87,7 @@ public class NettyMessagingService implements MessagingService {
.build();
private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
.softValues()
.build(new CacheLoader<String, Long>() {
@Override
public Long load(String type) {
return hashToLong(type);
......@@ -171,6 +169,10 @@ public class NettyMessagingService implements MessagingService {
}
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
if (ep.equals(localEp)) {
dispatchLocally(message);
return;
}
Channel channel = null;
try {
try {
......@@ -329,29 +331,7 @@ public class NettyMessagingService implements MessagingService {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
long type = message.type();
if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.set(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
}
return;
}
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
if (handler != null) {
handler.handle(message);
} else {
log.debug("No handler registered for {}", type);
}
dispatchLocally(message);
}
@Override
......@@ -361,6 +341,32 @@ public class NettyMessagingService implements MessagingService {
}
}
private void dispatchLocally(InternalMessage message) throws IOException {
long type = message.type();
if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.set(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
}
return;
}
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
if (handler != null) {
handler.handle(message);
} else {
log.debug("No handler registered for {}", type);
}
}
/**
* Returns the md5 hash of the specified input string as a long.
* @param input input string.
......