Madan Jampani

Netty native transport (epoll) support

......@@ -22,10 +22,8 @@ import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.MessageSubjectSerializer;
import org.onlab.util.KryoNamespace;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
......
......@@ -17,6 +17,8 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
......@@ -41,7 +43,8 @@ public class NettyMessagingService implements MessagingService {
private final int port;
private final Endpoint localEp;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private EventLoopGroup workerGroup;
private Class<? extends Channel> channelClass;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
......@@ -52,6 +55,17 @@ public class NettyMessagingService implements MessagingService {
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
// TODO: make this configurable.
private void initEventLoopGroup() {
try {
workerGroup = new EpollEventLoopGroup();
channelClass = EpollSocketChannel.class;
} catch (Throwable t) {
workerGroup = new NioEventLoopGroup();
channelClass = NioSocketChannel.class;
}
}
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
......@@ -71,6 +85,7 @@ public class NettyMessagingService implements MessagingService {
public void activate() throws Exception {
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
startAcceptingConnections();
}
......@@ -173,7 +188,7 @@ public class NettyMessagingService implements MessagingService {
bootstrap.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
bootstrap.channel(NioSocketChannel.class);
bootstrap.channel(channelClass);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
......