Madan Jampani
Committed by Gerrit Code Review

ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop

Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
Showing 19 changed files with 360 additions and 337 deletions
package org.onosproject.cluster;
import java.util.Set;
/**
* Service for obtaining the static definition of a controller cluster.
*/
public interface ClusterDefinitionService {
/**
* Returns the local controller node.
* @return local controller node
*/
ControllerNode localNode();
/**
* Returns the set of seed nodes that should be used for discovering other members
* of the cluster.
* @return set of seed controller nodes
*/
Set<ControllerNode> seedNodes();
/**
* Forms cluster configuration based on the specified set of node
* information. Assumes subsequent restart for the new configuration to
* take hold.
*
* @param nodes set of nodes that form the cluster
* @param ipPrefix IP address prefix, e.g. 10.0.1.*
*/
void formCluster(Set<ControllerNode> nodes, String ipPrefix);
}
\ No newline at end of file
......@@ -65,16 +65,6 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
DateTime getLastUpdated(NodeId nodeId);
/**
* Forms cluster configuration based on the specified set of node
* information. Assumes subsequent restart for the new configuration to
* take hold.
*
* @param nodes set of nodes that form the cluster
* @param ipPrefix IP address prefix, e.g. 10.0.1.*
*/
void formCluster(Set<ControllerNode> nodes, String ipPrefix);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
......
......@@ -25,6 +25,7 @@ import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterAdminService;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
......@@ -58,6 +59,9 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
listenerRegistry = new ListenerRegistry<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -70,6 +74,8 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
public void activate() {
store.setDelegate(delegate);
eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
clusterDefinitionService.seedNodes()
.forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
log.info("Started");
}
......@@ -113,7 +119,7 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
checkNotNull(nodes, "Nodes cannot be null");
checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
checkNotNull(ipPrefix, "IP prefix cannot be null");
store.formCluster(nodes, ipPrefix);
clusterDefinitionService.formCluster(nodes, ipPrefix);
try {
log.warn("Shutting down container for cluster reconfiguration!");
systemService.reboot("now", SystemService.Swipe.NONE);
......
package org.onosproject.store.cluster.impl;
import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.consistent.impl.DatabaseDefinition;
import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
/**
* Implementation of ClusterDefinitionService.
*/
@Component(immediate = true)
@Service
public class ClusterDefinitionManager implements ClusterDefinitionService {
public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
private static final String ONOS_NIC = "ONOS_NIC";
private static final Logger log = getLogger(ClusterDefinitionManager.class);
private ControllerNode localNode;
private Set<ControllerNode> seedNodes;
@Activate
public void activate() {
File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
if (!clusterDefinitionFile.exists()) {
createDefaultClusterDefinition(clusterDefinitionStore);
}
try {
ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
establishSelfIdentity(clusterDefinition);
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
.filter(n -> !localNode.id().equals(new NodeId(n.getId())))
.map(n -> new DefaultControllerNode(new NodeId(n.getId()),
IpAddress.valueOf(n.getIp()),
n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
throw new IllegalStateException("Failed to read cluster definition.", e);
}
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public ControllerNode localNode() {
return localNode;
}
@Override
public Set<ControllerNode> seedNodes() {
return seedNodes;
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
try {
Set<NodeInfo> infos = Sets.newHashSet();
nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
n.ip().toString(),
n.tcpPort())));
ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
DatabaseDefinition ddef = DatabaseDefinition.from(infos);
new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
} catch (IOException e) {
log.error("Unable to form cluster", e);
}
}
private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
return ip;
}
}
}
throw new IllegalStateException("Unable to determine local ip");
}
private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
try {
IpAddress ip = findLocalIp(clusterDefinition);
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
}
}
private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
try {
store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
}
/**
* Returns the address that matches the IP prefix given in ONOS_NIC
* environment variable if one was specified, or the first site local
* address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
IpAddress ip = IpAddress.valueOf(address);
if (ipPrefix == null && address.isSiteLocalAddress() ||
ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
return ip.toString();
}
}
}
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
}
......@@ -17,17 +17,17 @@ package org.onosproject.store.cluster.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
......@@ -37,18 +37,12 @@ import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.consistent.impl.DatabaseDefinition;
import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
......@@ -59,12 +53,7 @@ import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
......@@ -79,11 +68,9 @@ public class DistributedClusterStore
private static final Logger log = getLogger(DistributedClusterStore.class);
public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
// TODO: make these configurable.
private static final int HEARTBEAT_FD_PORT = 2419;
private static final int HEARTBEAT_INTERVAL_MS = 100;
private static final int PHI_FAILURE_THRESHOLD = 10;
......@@ -99,16 +86,10 @@ public class DistributedClusterStore
};
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
private static final String ONOS_NIC = "ONOS_NIC";
private ClusterDefinition clusterDefinition;
private Set<ControllerNode> seedNodes;
private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
private NettyMessagingManager messagingService;
private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/cluster/membership", "heartbeat-sender"));
private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
......@@ -118,45 +99,16 @@ public class DistributedClusterStore
private ControllerNode localNode;
@Activate
public void activate() {
File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
if (!clusterDefinitionFile.exists()) {
createDefaultClusterDefinition(clusterDefinitionStore);
}
try {
clusterDefinition = clusterDefinitionStore.read();
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
.map(n -> new DefaultControllerNode(new NodeId(n.getId()),
IpAddress.valueOf(n.getIp()),
n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
throw new IllegalStateException("Failed to read cluster definition.", e);
}
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
seedNodes.forEach(node -> {
allNodes.put(node.id(), node);
updateState(node.id(), State.INACTIVE);
});
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
establishSelfIdentity();
@Activate
public void activate() {
localNode = clusterDefinitionService.localNode();
messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT);
try {
messagingService.activate();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(
"Failed to cleanly initialize membership and"
+ " failure detector communication channel.", e);
}
messagingService.registerHandler(HEARTBEAT_MESSAGE,
new HeartbeatMessageHandler(), heartBeatMessageHandler);
......@@ -165,56 +117,15 @@ public class DistributedClusterStore
heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
log.info("Started");
}
private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = DistributedClusterStore.getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
try {
store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
}
/**
* Returns the address that matches the IP prefix given in ONOS_NIC
* environment variable if one was specified, or the first site local
* address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
IpAddress ip = IpAddress.valueOf(address);
if (ipPrefix == null && address.isSiteLocalAddress() ||
ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
return ip.toString();
}
}
}
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
addNode(localNode);
updateState(localNode.id(), State.ACTIVE);
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
log.info("Started");
}
@Deactivate
public void deactivate() {
try {
messagingService.deactivate();
} catch (Exception e) {
log.trace("Failed to cleanly shutdown cluster membership messaging", e);
}
messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
heartBeatSender.shutdownNow();
heartBeatMessageHandler.shutdownNow();
......@@ -262,9 +173,7 @@ public class DistributedClusterStore
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
updateState(nodeId, State.INACTIVE);
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
addNode(node);
return node;
}
......@@ -278,22 +187,10 @@ public class DistributedClusterStore
}
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
try {
Set<NodeInfo> infos = Sets.newHashSet();
nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
n.ip().toString(),
n.tcpPort())));
ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
DatabaseDefinition ddef = DatabaseDefinition.from(infos);
new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
} catch (IOException e) {
log.error("Unable to form cluster", e);
}
private void addNode(ControllerNode node) {
allNodes.put(node.id(), node);
updateState(node.id(), State.INACTIVE);
notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
}
private void updateState(NodeId nodeId, State newState) {
......@@ -301,18 +198,6 @@ public class DistributedClusterStore
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
}
private void establishSelfIdentity() {
try {
IpAddress ip = findLocalIp();
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
allNodes.put(localNode.id(), localNode);
updateState(localNode.id(), State.ACTIVE);
log.info("Local Node: {}", localNode);
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
}
}
private void heartbeat() {
try {
Set<ControllerNode> peers = allNodes.values()
......@@ -351,7 +236,7 @@ public class DistributedClusterStore
}
private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
try {
messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
} catch (IOException e) {
......@@ -359,22 +244,6 @@ public class DistributedClusterStore
}
}
private IpAddress findLocalIp() throws SocketException {
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
return ip;
}
}
}
throw new IllegalStateException("Unable to determine local ip");
}
private class HeartbeatMessageHandler implements Consumer<byte[]> {
@Override
public void accept(byte[] message) {
......
......@@ -130,11 +130,6 @@ public class HazelcastClusterStore
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
throw new UnsupportedOperationException("formCluster not implemented");
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
......
......@@ -21,8 +21,6 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.NettyMessagingManager;
import org.onlab.nio.service.IOLoopMessagingManager;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -60,47 +58,16 @@ public class ClusterCommunicationManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
// TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
private final boolean useNetty = true;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
@Activate
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
if (useNetty) {
NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort());
try {
netty.activate();
messagingService = netty;
} catch (Exception e) {
log.error("NettyMessagingService#activate", e);
}
} else {
IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort());
try {
ioLoop.activate();
messagingService = ioLoop;
} catch (Exception e) {
log.error("IOLoopMessagingService#activate", e);
}
}
log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
log.info("Started");
}
@Deactivate
public void deactivate() {
// TODO: cleanup messageingService if needed.
// FIXME: workaround until it becomes a service.
try {
if (useNetty) {
((NettyMessagingManager) messagingService).deactivate();
} else {
((IOLoopMessagingManager) messagingService).deactivate();
}
} catch (Exception e) {
log.error("MessagingService#deactivate", e);
}
log.info("Stopped");
}
......
package org.onosproject.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.nio.service.IOLoopMessaging;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* IOLoop based MessagingService.
*/
@Component(immediate = true, enabled = false)
@Service
public class IOLoopMessagingManager extends IOLoopMessaging {
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
@Activate
public void activate() throws Exception {
ControllerNode localNode = clusterDefinitionService.localNode();
super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
log.info("Started");
}
@Deactivate
public void deactivate() throws Exception {
super.stop();
log.info("Stopped");
}
}
package org.onosproject.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.netty.NettyMessaging;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Netty based MessagingService.
*/
@Component(immediate = true, enabled = true)
@Service
public class NettyMessagingManager extends NettyMessaging {
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterDefinitionService clusterDefinitionService;
@Activate
public void activate() throws Exception {
ControllerNode localNode = clusterDefinitionService.localNode();
super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
log.info("Started");
}
@Deactivate
public void deactivate() throws Exception {
super.stop();
log.info("Stopped");
}
}
\ No newline at end of file
......@@ -45,7 +45,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.DistributedClusterStore;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
......@@ -193,7 +193,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = DistributedClusterStore.getSiteLocalAddress();
String ip = ClusterDefinitionManager.getSiteLocalAddress();
NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
try {
store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
......
......@@ -26,7 +26,7 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.store.cluster.impl.DistributedClusterStore;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -66,7 +66,7 @@ public class StoreManager implements StoreService {
}
private void createDefaultHazelcastFile(File hazelcastFile) {
String ip = DistributedClusterStore.getSiteLocalAddress();
String ip = ClusterDefinitionManager.getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
InputStream his = getClass().getResourceAsStream("/hazelcast.xml");
try {
......
......@@ -140,7 +140,7 @@ public class ConsistentDeviceMastershipStore
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
log.info("Started.");
log.info("Started");
}
@Deactivate
......@@ -151,7 +151,7 @@ public class ConsistentDeviceMastershipStore
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
log.info("Stoppped.");
log.info("Stopped");
}
@Override
......
......@@ -22,7 +22,6 @@ import org.junit.Test;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.netty.NettyMessagingManager;
import org.onlab.packet.IpAddress;
import java.util.concurrent.CountDownLatch;
......
......@@ -93,11 +93,6 @@ public class SimpleClusterStore
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
}
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return null;
}
......
......@@ -80,5 +80,4 @@
<version>${netty4.version}</version>
</dependency>
</dependencies>
</project>
......
......@@ -37,22 +37,20 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
......@@ -66,14 +64,15 @@ import com.google.common.cache.RemovalNotification;
/**
* Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
*/
public class NettyMessagingManager implements MessagingService {
public class NettyMessaging implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private final Endpoint localEp;
private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private Endpoint localEp;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS)
......@@ -104,7 +103,8 @@ public class NettyMessagingManager implements MessagingService {
clientChannelClass = EpollSocketChannel.class;
return;
} catch (Throwable e) {
log.warn("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", e.getMessage());
log.debug("Failed to initialize native (epoll) transport. "
+ "Reason: {}. Proceeding with nio.", e.getMessage());
}
clientGroup = new NioEventLoopGroup();
serverGroup = new NioEventLoopGroup();
......@@ -112,43 +112,27 @@ public class NettyMessagingManager implements MessagingService {
clientChannelClass = NioSocketChannel.class;
}
public NettyMessagingManager(IpAddress ip, int port) {
localEp = new Endpoint(ip, port);
}
public NettyMessagingManager() {
this(8080);
}
public NettyMessagingManager(int port) {
try {
localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
} catch (UnknownHostException e) {
// Cannot resolve the local host, something is very wrong. Bailing out.
throw new IllegalStateException("Cannot resolve local host", e);
}
public void start(Endpoint localEp) throws Exception {
if (started.get()) {
log.warn("Already running at local endpoint: {}", localEp);
return;
}
public void activate() throws InterruptedException {
this.localEp = localEp;
channels.setLifo(false);
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
initEventLoopGroup();
startAcceptingConnections();
started.set(true);
}
public void deactivate() throws Exception {
public void stop() throws Exception {
if (started.get()) {
channels.close();
serverGroup.shutdownGracefully();
clientGroup.shutdownGracefully();
started.set(false);
}
/**
* Returns the local endpoint for this instance.
* @return local end point.
*/
public Endpoint localEp() {
return localEp;
}
@Override
......@@ -237,7 +221,13 @@ public class NettyMessagingManager implements MessagingService {
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
b.bind(localEp.port()).sync();
b.bind(localEp.port()).sync().addListener(future -> {
if (future.isSuccess()) {
log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
} else {
log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
}
});
}
private class OnosCommunicationChannelFactory
......
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.netty;
import static org.junit.Assert.assertArrayEquals;
import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.util.concurrent.MoreExecutors;
/**
* Simple ping-pong test that exercises NettyMessagingService.
*/
public class PingPongTest {
@Ignore("Turning off fragile test")
@Test
public void testPingPong() throws Exception {
NettyMessagingManager pinger = new NettyMessagingManager(8085);
NettyMessagingManager ponger = new NettyMessagingManager(9086);
try {
pinger.activate();
ponger.activate();
ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
byte[] payload = RandomUtils.nextBytes(100);
CompletableFuture<byte[]> responseFuture =
pinger.sendAndReceive(
new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
} finally {
pinger.deactivate();
ponger.deactivate();
}
}
}
......@@ -55,5 +55,4 @@
<scope>test</scope>
</dependency>
</dependencies>
</project>
......
......@@ -31,6 +31,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -39,7 +40,6 @@ import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.SelectorLoop;
import org.onlab.packet.IpAddress;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
......@@ -54,13 +54,12 @@ import com.google.common.collect.Lists;
/**
* MessagingService implementation based on IOLoop.
*/
public class IOLoopMessagingManager implements MessagingService {
public class IOLoopMessaging implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
static final int PORT = 9876;
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
......@@ -79,7 +78,8 @@ public class IOLoopMessagingManager implements MessagingService {
private int lastWorker = -1;
private final Endpoint localEp;
private final AtomicBoolean started = new AtomicBoolean(false);
private Endpoint localEp;
private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
......@@ -97,34 +97,17 @@ public class IOLoopMessagingManager implements MessagingService {
})
.build();
public IOLoopMessagingManager(int port) {
this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
}
public IOLoopMessagingManager(IpAddress ip, int port) {
this(new Endpoint(ip, port));
}
public IOLoopMessagingManager(Endpoint localEp) {
this.localEp = localEp;
}
/**
* Returns the local endpoint.
*
* @return local endpoint
*/
public Endpoint localEp() {
return localEp;
}
/**
* Activates IO Loops.
*
* @throws IOException is activation fails
*/
public void activate() throws IOException {
public void start(Endpoint localEp) throws IOException {
if (started.get()) {
log.warn("IOMessaging is already running at {}", localEp);
return;
}
this.localEp = localEp;
streams.setLifo(false);
this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
......@@ -136,16 +119,20 @@ public class IOLoopMessagingManager implements MessagingService {
acceptorThreadPool.execute(acceptorLoop);
ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
acceptorLoop.awaitStart(TIMEOUT);
started.set(true);
}
/**
* Shuts down IO loops.
*/
public void deactivate() {
public void stop() {
if (started.get()) {
ioLoops.forEach(SelectorLoop::shutdown);
acceptorLoop.shutdown();
ioThreadPool.shutdown();
acceptorThreadPool.shutdown();
started.set(false);
}
}
......