Madan Jampani

Netty performance improvements

package org.onlab.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* Encode InternalMessage out into a byte buffer.
*/
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// onosiscool in ascii
......
......@@ -11,6 +11,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
......@@ -37,14 +38,19 @@ public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private GenericKeyedObjectPool<Endpoint, Channel> channels;
private final int port;
private final Endpoint localEp;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private Cache<Long, AsyncResponse<?>> responseFutures;
private final Endpoint localEp;
private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
protected Serializer serializer;
......@@ -65,15 +71,8 @@ public class NettyMessagingService implements MessagingService {
}
public void activate() throws Exception {
channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
startAcceptingConnections();
}
......@@ -145,7 +144,8 @@ public class NettyMessagingService implements MessagingService {
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
// TODO: Need JVM options to configure PooledByteBufAllocator.
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
......@@ -172,19 +172,18 @@ public class NettyMessagingService implements MessagingService {
@Override
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.group(workerGroup);
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
bootstrap.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new OnosCommunicationChannelInitializer());
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
return f.channel();
}
......@@ -201,12 +200,15 @@ public class NettyMessagingService implements MessagingService {
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(serializer);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", new MessageEncoder(serializer))
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
.addLast("handler", new InboundMessageDispatcher());
.addLast("handler", dispatcher);
}
}
......@@ -222,10 +224,11 @@ public class NettyMessagingService implements MessagingService {
@Override
public void run() {
channel.writeAndFlush(message);
channel.writeAndFlush(message, channel.voidPromise());
}
}
@ChannelHandler.Sharable
private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
@Override
......@@ -248,7 +251,6 @@ public class NettyMessagingService implements MessagingService {
handler.handle(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
context.close();
......