Madan Jampani

Limit the amont of work that happens on netty event loop threads.

Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.

Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
......@@ -76,7 +76,7 @@ public class ClusterCommunicationManager
@Activate
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
netty.activate();
......@@ -143,7 +143,7 @@ public class ClusterCommunicationManager
private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, subject.value(), payload);
return true;
......@@ -166,7 +166,7 @@ public class ClusterCommunicationManager
public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
......
......@@ -19,9 +19,11 @@ package org.onlab.netty;
* State transitions a decoder goes through as it is decoding an incoming message.
*/
public enum DecoderState {
READ_HEADER_VERSION,
READ_PREAMBLE,
READ_MESSAGE_ID,
READ_SENDER_IP_VERSION,
READ_SENDER_IP,
READ_SENDER_PORT,
READ_MESSAGE_TYPE,
READ_CONTENT_LENGTH,
READ_SERIALIZER_VERSION,
READ_CONTENT
}
......
......@@ -15,8 +15,12 @@
*/
package org.onlab.netty;
import static com.google.common.base.Preconditions.*;
import java.util.Objects;
import org.onlab.packet.IpAddress;
import com.google.common.base.MoreObjects;
/**
......@@ -25,15 +29,15 @@ import com.google.common.base.MoreObjects;
public final class Endpoint {
private final int port;
private final String host;
private final IpAddress ip;
public Endpoint(String host, int port) {
this.host = host;
public Endpoint(IpAddress host, int port) {
this.ip = checkNotNull(host);
this.port = port;
}
public String host() {
return host;
public IpAddress host() {
return ip;
}
public int port() {
......@@ -43,14 +47,14 @@ public final class Endpoint {
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("host", host)
.add("ip", ip)
.add("port", port)
.toString();
}
@Override
public int hashCode() {
return Objects.hash(host, port);
return Objects.hash(ip, port);
}
@Override
......@@ -66,6 +70,6 @@ public final class Endpoint {
}
Endpoint that = (Endpoint) obj;
return Objects.equals(this.port, that.port) &&
Objects.equals(this.host, that.host);
Objects.equals(this.ip, that.ip);
}
}
......
......@@ -27,18 +27,19 @@ import com.google.common.base.MoreObjects;
*/
public final class InternalMessage implements Message {
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
public static final long REPLY_MESSAGE_TYPE =
NettyMessagingService.hashToLong("NETTY_MESSAGING_REQUEST_REPLY");
private long id;
private Endpoint sender;
private String type;
private long type;
private byte[] payload;
private transient NettyMessagingService messagingService;
// Must be created using the Builder.
private InternalMessage() {}
InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
InternalMessage(long id, Endpoint sender, long type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
......@@ -49,7 +50,7 @@ public final class InternalMessage implements Message {
return id;
}
public String type() {
public long type() {
return type;
}
......@@ -103,7 +104,7 @@ public final class InternalMessage implements Message {
return this;
}
public Builder withType(String type) {
public Builder withType(long type) {
message.type = type;
return this;
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import org.onlab.util.KryoNamespace;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.nio.ByteBuffer;
/**
* Kryo Serializer.
*/
public class KryoSerializer {
private KryoNamespace serializerPool;
public KryoSerializer() {
setupKryoPool();
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(byte[].class)
.register(new InternalMessageSerializer(), InternalMessage.class)
.register(new EndPointSerializer(), Endpoint.class)
.build();
}
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
public static final class InternalMessageSerializer
extends Serializer<InternalMessage> {
@Override
public void write(Kryo kryo, Output output, InternalMessage object) {
output.writeLong(object.id());
kryo.writeClassAndObject(output, object.sender());
output.writeString(object.type());
output.writeInt(object.payload().length, true);
output.writeBytes(object.payload());
}
@Override
public InternalMessage read(Kryo kryo, Input input,
Class<InternalMessage> type) {
long id = input.readLong();
Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
String msgtype = input.readString();
int length = input.readInt(true);
byte[] payload = input.readBytes(length);
return new InternalMessage(id, sender, msgtype, payload);
}
}
public static final class EndPointSerializer extends Serializer<Endpoint> {
@Override
public void write(Kryo kryo, Output output, Endpoint object) {
output.writeString(object.host());
output.writeInt(object.port());
}
@Override
public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
String host = input.readString();
int port = input.readInt();
return new Endpoint(host, port);
}
}
}
......@@ -20,9 +20,10 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.Arrays;
import java.util.List;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -35,12 +36,15 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
private long messageId;
private Version ipVersion;
private IpAddress senderIp;
private int senderPort;
private int contentLength;
private long messageType;
public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_HEADER_VERSION);
super(DecoderState.READ_MESSAGE_ID);
this.messagingService = messagingService;
}
......@@ -51,27 +55,37 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
List<Object> out) throws Exception {
switch (state()) {
case READ_HEADER_VERSION:
int headerVersion = buffer.readInt();
checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
checkpoint(DecoderState.READ_PREAMBLE);
case READ_PREAMBLE:
byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
buffer.readBytes(preamble);
checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
case READ_MESSAGE_ID:
messageId = buffer.readLong();
checkpoint(DecoderState.READ_SENDER_IP_VERSION);
case READ_SENDER_IP_VERSION:
ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
checkpoint(DecoderState.READ_SENDER_IP);
case READ_SENDER_IP:
byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
buffer.readBytes(octects);
senderIp = IpAddress.valueOf(ipVersion, octects);
checkpoint(DecoderState.READ_SENDER_PORT);
case READ_SENDER_PORT:
senderPort = buffer.readInt();
checkpoint(DecoderState.READ_MESSAGE_TYPE);
case READ_MESSAGE_TYPE:
messageType = buffer.readLong();
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_SERIALIZER_VERSION);
case READ_SERIALIZER_VERSION:
int serializerVersion = buffer.readInt();
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
byte[] payload = new byte[contentLength];
buffer.readBytes(payload);
InternalMessage message = new InternalMessage(
messageId,
new Endpoint(senderIp, senderPort),
messageType,
payload);
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
checkpoint(DecoderState.READ_MESSAGE_ID);
break;
default:
checkState(false, "Must not be here");
......
......@@ -15,17 +15,18 @@
*/
package org.onlab.netty;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Encode InternalMessage out into a byte buffer.
*/
......@@ -34,34 +35,36 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
private final Logger log = LoggerFactory.getLogger(getClass());
// onosiscool in ascii
static final byte[] PREAMBLE = "onosiscool".getBytes(StandardCharsets.US_ASCII);
public static final int HEADER_VERSION = 1;
public static final int SERIALIZER_VERSION = 1;
private static final KryoSerializer SERIALIZER = new KryoSerializer();
@Override
protected void encode(
ChannelHandlerContext context,
InternalMessage message,
ByteBuf out) throws Exception {
// write version
out.writeInt(HEADER_VERSION);
// write message id
out.writeLong(message.id());
// write preamble
out.writeBytes(PREAMBLE);
Endpoint sender = message.sender();
byte[] payload = SERIALIZER.encode(message);
IpAddress senderIp = sender.host();
if (senderIp.version() == Version.INET) {
out.writeByte(0);
} else {
out.writeByte(1);
}
out.writeBytes(senderIp.toOctets());
// write sender port
out.writeInt(sender.port());
// write message type.
out.writeLong(message.type());
byte[] payload = message.payload();
// write payload length
out.writeInt(payload.length);
// write payloadSerializer version
out.writeInt(SERIALIZER_VERSION);
// write payload.
out.writeBytes(payload);
}
......
......@@ -37,6 +37,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
......@@ -46,13 +47,18 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
......@@ -64,7 +70,7 @@ public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
......@@ -78,6 +84,17 @@ public class NettyMessagingService implements MessagingService {
}
})
.build();
private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
.softValues()
.build(new CacheLoader<String, Long>() {
@Override
public Long load(String type) {
return hashToLong(type);
}
});
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
......@@ -103,7 +120,7 @@ public class NettyMessagingService implements MessagingService {
clientChannelClass = NioSocketChannel.class;
}
public NettyMessagingService(String ip, int port) {
public NettyMessagingService(IpAddress ip, int port) {
localEp = new Endpoint(ip, port);
}
......@@ -113,7 +130,7 @@ public class NettyMessagingService implements MessagingService {
public NettyMessagingService(int port) {
try {
localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
} catch (UnknownHostException e) {
// Cannot resolve the local host, something is very wrong. Bailing out.
throw new IllegalStateException("Cannot resolve local host", e);
......@@ -146,7 +163,7 @@ public class NettyMessagingService implements MessagingService {
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(type)
.withType(messageTypeLookupCache.getUnchecked(type))
.withPayload(payload)
.build();
sendAsync(ep, message);
......@@ -178,7 +195,7 @@ public class NettyMessagingService implements MessagingService {
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
.withType(type)
.withType(messageTypeLookupCache.getUnchecked(type))
.withPayload(payload)
.build();
try {
......@@ -192,7 +209,7 @@ public class NettyMessagingService implements MessagingService {
@Override
public void registerHandler(String type, MessageHandler handler) {
handlers.putIfAbsent(type, handler);
handlers.putIfAbsent(hashToLong(type), handler);
}
@Override
......@@ -200,7 +217,7 @@ public class NettyMessagingService implements MessagingService {
handlers.remove(type);
}
private MessageHandler getMessageHandler(String type) {
private MessageHandler getMessageHandler(long type) {
return handlers.get(type);
}
......@@ -245,7 +262,7 @@ public class NettyMessagingService implements MessagingService {
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
return f.channel();
}
......@@ -295,8 +312,8 @@ public class NettyMessagingService implements MessagingService {
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
long type = message.type();
if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
......@@ -326,4 +343,13 @@ public class NettyMessagingService implements MessagingService {
context.close();
}
}
/**
* Returns the md5 hash of the specified input string as a long.
* @param input input string.
* @return md5 hash as long.
*/
public static long hashToLong(String input) {
return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
}
}
\ No newline at end of file
......
......@@ -15,15 +15,16 @@
*/
package org.onlab.netty;
import static org.junit.Assert.assertArrayEquals;
import java.net.InetAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import static org.junit.Assert.*;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.packet.IpAddress;
/**
* Simple ping-pong test that exercises NettyMessagingService.
......@@ -40,7 +41,9 @@ public class PingPongTest {
ponger.activate();
ponger.registerHandler("echo", new EchoHandler());
byte[] payload = RandomUtils.nextBytes(100);
Future<byte[]> responseFuture = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
Future<byte[]> responseFuture =
pinger.sendAndReceive(
new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
} finally {
pinger.deactivate();
......