Thomas Vachuska
Committed by Gerrit Code Review

Renamed IOLoop & Netty to *MessagingManager for consistency.

Change-Id: Id8859e24d0c7ac7f948516388069639093bad524
......@@ -25,7 +25,7 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterEvent;
......@@ -108,7 +108,7 @@ public class DistributedClusterStore
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private NettyMessagingService messagingService;
private NettyMessagingManager messagingService;
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
......@@ -148,7 +148,7 @@ public class DistributedClusterStore
establishSelfIdentity();
messagingService = new NettyMessagingService(HEARTBEAT_FD_PORT);
messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT);
try {
messagingService.activate();
} catch (InterruptedException e) {
......
......@@ -21,8 +21,8 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.NettyMessagingService;
import org.onlab.nio.service.IOLoopMessagingService;
import org.onlab.netty.NettyMessagingManager;
import org.onlab.nio.service.IOLoopMessagingManager;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -69,7 +69,7 @@ public class ClusterCommunicationManager
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
if (useNetty) {
NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort());
try {
netty.activate();
messagingService = netty;
......@@ -77,7 +77,7 @@ public class ClusterCommunicationManager
log.error("NettyMessagingService#activate", e);
}
} else {
IOLoopMessagingService ioLoop = new IOLoopMessagingService(localNode.ip(), localNode.tcpPort());
IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort());
try {
ioLoop.activate();
messagingService = ioLoop;
......@@ -94,9 +94,9 @@ public class ClusterCommunicationManager
// FIXME: workaround until it becomes a service.
try {
if (useNetty) {
((NettyMessagingService) messagingService).deactivate();
((NettyMessagingManager) messagingService).deactivate();
} else {
((IOLoopMessagingService) messagingService).deactivate();
((IOLoopMessagingManager) messagingService).deactivate();
}
} catch (Exception e) {
log.error("MessagingService#deactivate", e);
......
......@@ -22,7 +22,7 @@ import org.junit.Test;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import java.util.concurrent.CountDownLatch;
......@@ -56,7 +56,7 @@ public class ClusterCommunicationManagerTest {
@Before
public void setUp() throws Exception {
NettyMessagingService messagingService = new NettyMessagingService();
NettyMessagingManager messagingService = new NettyMessagingManager();
messagingService.activate();
ccm1 = new ClusterCommunicationManager();
......
......@@ -65,7 +65,7 @@ import com.google.common.cache.RemovalNotification;
/**
* Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
public class NettyMessagingService implements MessagingService {
public class NettyMessagingManager implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -111,15 +111,15 @@ public class NettyMessagingService implements MessagingService {
clientChannelClass = NioSocketChannel.class;
}
public NettyMessagingService(IpAddress ip, int port) {
public NettyMessagingManager(IpAddress ip, int port) {
localEp = new Endpoint(ip, port);
}
public NettyMessagingService() {
public NettyMessagingManager() {
this(8080);
}
public NettyMessagingService(int port) {
public NettyMessagingManager(int port) {
try {
localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
} catch (UnknownHostException e) {
......
......@@ -38,8 +38,8 @@ public class PingPongTest {
@Ignore("Turning off fragile test")
@Test
public void testPingPong() throws Exception {
NettyMessagingService pinger = new NettyMessagingService(8085);
NettyMessagingService ponger = new NettyMessagingService(9086);
NettyMessagingManager pinger = new NettyMessagingManager(8085);
NettyMessagingManager ponger = new NettyMessagingManager(9086);
try {
pinger.activate();
ponger.activate();
......
......@@ -38,6 +38,7 @@ import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.SelectorLoop;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
......@@ -53,7 +54,7 @@ import com.google.common.collect.Lists;
/**
* MessagingService implementation based on IOLoop.
*/
public class IOLoopMessagingService implements MessagingService {
public class IOLoopMessagingManager implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -81,7 +82,7 @@ public class IOLoopMessagingService implements MessagingService {
private final Endpoint localEp;
private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
new GenericKeyedObjectPool<Endpoint, DefaultMessageStream>(new DefaultMessageStreamFactory());
new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
......@@ -97,20 +98,21 @@ public class IOLoopMessagingService implements MessagingService {
.build();
public IOLoopMessagingService(int port) {
public IOLoopMessagingManager(int port) {
this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
}
public IOLoopMessagingService(IpAddress ip, int port) {
public IOLoopMessagingManager(IpAddress ip, int port) {
this(new Endpoint(ip, port));
}
public IOLoopMessagingService(Endpoint localEp) {
public IOLoopMessagingManager(Endpoint localEp) {
this.localEp = localEp;
}
/**
* Returns the local endpoint.
*
* @return local endpoint
*/
public Endpoint localEp() {
......@@ -119,6 +121,7 @@ public class IOLoopMessagingService implements MessagingService {
/**
* Activates IO Loops.
*
* @throws IOException is activation fails
*/
public void activate() throws IOException {
......@@ -129,7 +132,7 @@ public class IOLoopMessagingService implements MessagingService {
ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
}
ioLoops.forEach(loop -> ioThreadPool.execute(loop));
ioLoops.forEach(ioThreadPool::execute);
acceptorThreadPool.execute(acceptorLoop);
ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
acceptorLoop.awaitStart(TIMEOUT);
......@@ -139,7 +142,7 @@ public class IOLoopMessagingService implements MessagingService {
* Shuts down IO loops.
*/
public void deactivate() {
ioLoops.forEach(loop -> loop.shutdown());
ioLoops.forEach(SelectorLoop::shutdown);
acceptorLoop.shutdown();
ioThreadPool.shutdown();
acceptorThreadPool.shutdown();
......