Madan Jampani

Using native transport (epoll) with netty

...@@ -16,7 +16,11 @@ import io.netty.channel.ChannelHandlerContext; ...@@ -16,7 +16,11 @@ import io.netty.channel.ChannelHandlerContext;
16 import io.netty.channel.ChannelInitializer; 16 import io.netty.channel.ChannelInitializer;
17 import io.netty.channel.ChannelOption; 17 import io.netty.channel.ChannelOption;
18 import io.netty.channel.EventLoopGroup; 18 import io.netty.channel.EventLoopGroup;
19 +import io.netty.channel.ServerChannel;
19 import io.netty.channel.SimpleChannelInboundHandler; 20 import io.netty.channel.SimpleChannelInboundHandler;
21 +import io.netty.channel.epoll.EpollEventLoopGroup;
22 +import io.netty.channel.epoll.EpollServerSocketChannel;
23 +import io.netty.channel.epoll.EpollSocketChannel;
20 import io.netty.channel.nio.NioEventLoopGroup; 24 import io.netty.channel.nio.NioEventLoopGroup;
21 import io.netty.channel.socket.SocketChannel; 25 import io.netty.channel.socket.SocketChannel;
22 import io.netty.channel.socket.nio.NioServerSocketChannel; 26 import io.netty.channel.socket.nio.NioServerSocketChannel;
...@@ -40,9 +44,10 @@ public class NettyMessagingService implements MessagingService { ...@@ -40,9 +44,10 @@ public class NettyMessagingService implements MessagingService {
40 44
41 private final int port; 45 private final int port;
42 private final Endpoint localEp; 46 private final Endpoint localEp;
43 - private final EventLoopGroup bossGroup = new NioEventLoopGroup(); 47 + private EventLoopGroup bossGroup;
44 private EventLoopGroup workerGroup; 48 private EventLoopGroup workerGroup;
45 - private Class<? extends Channel> channelClass; 49 + private Class<? extends Channel> clientChannelClass;
50 + private Class<? extends ServerChannel> serverChannelClass;
46 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); 51 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
47 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder() 52 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
48 .maximumSize(100000) 53 .maximumSize(100000)
...@@ -55,8 +60,17 @@ public class NettyMessagingService implements MessagingService { ...@@ -55,8 +60,17 @@ public class NettyMessagingService implements MessagingService {
55 60
56 // TODO: make this configurable. 61 // TODO: make this configurable.
57 private void initEventLoopGroup() { 62 private void initEventLoopGroup() {
58 - workerGroup = new NioEventLoopGroup(); 63 + try {
59 - channelClass = NioSocketChannel.class; 64 + bossGroup = new EpollEventLoopGroup();
65 + workerGroup = new EpollEventLoopGroup();
66 + clientChannelClass = EpollSocketChannel.class;
67 + serverChannelClass = EpollServerSocketChannel.class;
68 + } catch (Throwable th) {
69 + bossGroup = new NioEventLoopGroup();
70 + workerGroup = new NioEventLoopGroup();
71 + serverChannelClass = NioServerSocketChannel.class;
72 + clientChannelClass = NioSocketChannel.class;
73 + }
60 } 74 }
61 75
62 public NettyMessagingService() { 76 public NettyMessagingService() {
...@@ -150,7 +164,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -150,7 +164,7 @@ public class NettyMessagingService implements MessagingService {
150 // TODO: Need JVM options to configure PooledByteBufAllocator. 164 // TODO: Need JVM options to configure PooledByteBufAllocator.
151 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); 165 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
152 b.group(bossGroup, workerGroup) 166 b.group(bossGroup, workerGroup)
153 - .channel(NioServerSocketChannel.class) 167 + .channel(serverChannelClass)
154 .childHandler(new OnosCommunicationChannelInitializer()) 168 .childHandler(new OnosCommunicationChannelInitializer())
155 .option(ChannelOption.SO_BACKLOG, 128) 169 .option(ChannelOption.SO_BACKLOG, 128)
156 .childOption(ChannelOption.SO_KEEPALIVE, true); 170 .childOption(ChannelOption.SO_KEEPALIVE, true);
...@@ -181,7 +195,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -181,7 +195,7 @@ public class NettyMessagingService implements MessagingService {
181 bootstrap.group(workerGroup); 195 bootstrap.group(workerGroup);
182 // TODO: Make this faster: 196 // TODO: Make this faster:
183 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 197 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
184 - bootstrap.channel(channelClass); 198 + bootstrap.channel(clientChannelClass);
185 bootstrap.option(ChannelOption.SO_KEEPALIVE, true); 199 bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
186 bootstrap.handler(new OnosCommunicationChannelInitializer()); 200 bootstrap.handler(new OnosCommunicationChannelInitializer());
187 // Start the client. 201 // Start the client.
......