Madan Jampani

Netty epoll support. Now with updated pom.xml and features.xml to bring in the dependencies

......@@ -15,6 +15,7 @@
<bundle>mvn:io.netty/netty-transport/4.0.23.Final</bundle>
<bundle>mvn:io.netty/netty-handler/4.0.23.Final</bundle>
<bundle>mvn:io.netty/netty-codec/4.0.23.Final</bundle>
<bundle>io.netty/netty-transport-native-epoll/4.0.23.Final</bundle>
<bundle>mvn:commons-pool/commons-pool/1.6</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
......
......@@ -312,6 +312,11 @@
<artifactId>netty-codec</artifactId>
<version>${netty4.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty4.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
......
......@@ -55,6 +55,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -16,7 +16,12 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
......@@ -40,9 +45,6 @@ public class NettyMessagingService implements MessagingService {
private final int port;
private final Endpoint localEp;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup;
private Class<? extends Channel> channelClass;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
......@@ -53,10 +55,32 @@ public class NettyMessagingService implements MessagingService {
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
// TODO: make this configurable.
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
private Class<? extends ServerChannel> serverChannelClass;
private Class<? extends Channel> clientChannelClass;
private void initEventLoopGroup() {
workerGroup = new NioEventLoopGroup();
channelClass = NioSocketChannel.class;
// try Epoll first and if that does work, use nio.
// TODO: make this configurable.
try {
if (Epoll.isAvailable()) {
clientGroup = new EpollEventLoopGroup();
serverGroup = new EpollEventLoopGroup();
serverChannelClass = EpollServerSocketChannel.class;
clientChannelClass = EpollSocketChannel.class;
return;
} else {
log.info("Netty epoll support is not available. Proceeding with nio.");
}
} catch (Throwable t) {
log.warn("Failed to initialize epoll sockets. Proceeding with nio.", t);
}
clientGroup = new NioEventLoopGroup();
serverGroup = new NioEventLoopGroup();
serverChannelClass = NioServerSocketChannel.class;
clientChannelClass = NioSocketChannel.class;
}
public NettyMessagingService() {
......@@ -84,8 +108,8 @@ public class NettyMessagingService implements MessagingService {
public void deactivate() throws Exception {
channels.close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
}
@Override
......@@ -149,8 +173,8 @@ public class NettyMessagingService implements MessagingService {
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
// TODO: Need JVM options to configure PooledByteBufAllocator.
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
b.group(serverGroup, clientGroup)
.channel(serverChannelClass)
.childHandler(new OnosCommunicationChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
......@@ -178,10 +202,10 @@ public class NettyMessagingService implements MessagingService {
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
bootstrap.group(workerGroup);
bootstrap.group(clientGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
bootstrap.channel(channelClass);
bootstrap.channel(clientChannelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
......