tom

Preparing for change in ClusterService/Store implementation.

livetribe.slp.da.expired.services.purge.period=60
livetribe.slp.sa.client.connect.address=127.0.0.1
livetribe.slp.sa.client.factory=org.livetribe.slp.sa.StandardServiceAgentClient$Factory
livetribe.slp.sa.factory=org.livetribe.slp.sa.StandardServiceAgent$Factory
livetribe.slp.sa.service.renewal.enabled=true
livetribe.slp.sa.unicast.prefer.tcp=false
livetribe.slp.tcp.connector.factory=org.livetribe.slp.spi.net.SocketTCPConnector$Factory
livetribe.slp.tcp.connector.server.factory=org.livetribe.slp.spi.net.SocketTCPConnectorServer$Factory
livetribe.slp.tcp.message.max.length=4096
livetribe.slp.tcp.read.timeout=300000
livetribe.slp.ua.client.factory=org.livetribe.slp.ua.StandardUserAgentClient$Factory
livetribe.slp.ua.factory=org.livetribe.slp.ua.StandardUserAgent$Factory
livetribe.slp.ua.unicast.prefer.tcp=false
livetribe.slp.udp.connector.factory=org.livetribe.slp.spi.net.SocketUDPConnector$Factory
livetribe.slp.udp.connector.server.factory=org.livetribe.slp.spi.net.SocketUDPConnectorServer$Factory
net.slp.DAAddresses=
net.slp.DAAttributes=
net.slp.DAHeartBeat=10800
net.slp.MTU=1400
net.slp.SAAttributes=
net.slp.broadcastAddress=255.255.255.255
net.slp.datagramTimeouts=150,250,400
net.slp.interfaces=0.0.0.0
net.slp.isBroadcastOnly=false
net.slp.locale=en
net.slp.multicastAddress=239.255.255.253
net.slp.multicastMaximumWait=15000
net.slp.multicastTTL=255
net.slp.multicastTimeouts=150,250,400,600,1000
net.slp.notificationPort=1847
net.slp.port=427
net.slp.useScopes=default
org.onlab.cluster.name = TV-ONOS
......@@ -17,7 +17,7 @@ import static com.google.common.collect.Lists.newArrayList;
public class NodesListCommand extends AbstractShellCommand {
private static final String FMT =
"id=%s, ip=%s, state=%s %s";
"id=%s, address=%s:%s, state=%s %s";
@Override
protected void execute() {
......@@ -26,7 +26,7 @@ public class NodesListCommand extends AbstractShellCommand {
Collections.sort(nodes, Comparators.NODE_COMPARATOR);
ControllerNode self = service.getLocalNode();
for (ControllerNode node : nodes) {
print(FMT, node.id(), node.ip(),
print(FMT, node.id(), node.ip(), node.tcpPort(),
service.getState(node.id()),
node.equals(self) ? "*" : "");
}
......
package org.onlab.onos.cluster;
import org.onlab.packet.IpPrefix;
/**
* Service for administering the cluster node membership.
*/
public interface ClusterAdminService {
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Removes the specified node from the cluster node list.
*
* @param nodeId controller node identifier
......
package org.onlab.onos.cluster;
import org.onlab.onos.store.Store;
import org.onlab.packet.IpPrefix;
import java.util.Set;
......@@ -40,6 +41,16 @@ public interface ClusterStore extends Store<ClusterEvent, ClusterStoreDelegate>
ControllerNode.State getState(NodeId nodeId);
/**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
* @param ip node IP listen address
* @param tcpPort tcp listen port
* @return newly added node
*/
ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort);
/**
* Removes the specified node from the inventory of cluster nodes.
*
* @param nodeId controller instance identifier
......
......@@ -35,4 +35,12 @@ public interface ControllerNode {
*/
IpPrefix ip();
/**
* Returns the TCP port on which the node listens for connections.
*
* @return TCP port
*/
int tcpPort();
}
......
......@@ -11,13 +11,17 @@ import static com.google.common.base.MoreObjects.toStringHelper;
*/
public class DefaultControllerNode implements ControllerNode {
private static final int DEFAULT_PORT = 9876;
private final NodeId id;
private final IpPrefix ip;
private final int tcpPort;
// For serialization
private DefaultControllerNode() {
this.id = null;
this.ip = null;
this.tcpPort = 0;
}
/**
......@@ -27,8 +31,19 @@ public class DefaultControllerNode implements ControllerNode {
* @param ip instance IP address
*/
public DefaultControllerNode(NodeId id, IpPrefix ip) {
this(id, ip, DEFAULT_PORT);
}
/**
* Creates a new instance with the specified id and IP address and TCP port.
*
* @param id instance identifier
* @param ip instance IP address
*/
public DefaultControllerNode(NodeId id, IpPrefix ip, int tcpPort) {
this.id = id;
this.ip = ip;
this.tcpPort = tcpPort;
}
@Override
......@@ -42,6 +57,11 @@ public class DefaultControllerNode implements ControllerNode {
}
@Override
public int tcpPort() {
return tcpPort;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
......@@ -60,7 +80,8 @@ public class DefaultControllerNode implements ControllerNode {
@Override
public String toString() {
return toStringHelper(this).add("id", id).add("ip", ip).toString();
return toStringHelper(this).add("id", id)
.add("ip", ip).add("tcpPort", tcpPort).toString();
}
}
......
......@@ -16,10 +16,12 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.AbstractListenerRegistry;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -81,6 +83,14 @@ public class ClusterManager implements ClusterService, ClusterAdminService {
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
checkArgument(tcpPort > 5000, "TCP port must be > 5000");
return store.addNode(nodeId, ip, tcpPort);
}
@Override
public void removeNode(NodeId nodeId) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
store.removeNode(nodeId);
......
......@@ -67,7 +67,7 @@ public class DistributedClusterStore
// Loads the initial set of cluster nodes
private void loadClusterNodes() {
for (Member member : theInstance.getCluster().getMembers()) {
addMember(member);
addNode(node(member));
}
}
......@@ -103,6 +103,11 @@ public class DistributedClusterStore
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
@Override
public void removeNode(NodeId nodeId) {
synchronized (this) {
rawNodes.remove(serialize(nodeId));
......@@ -111,8 +116,7 @@ public class DistributedClusterStore
}
// Adds a new node based on the specified member
private synchronized ControllerNode addMember(Member member) {
DefaultControllerNode node = node(member);
private synchronized ControllerNode addNode(DefaultControllerNode node) {
rawNodes.put(serialize(node.id()), serialize(node));
nodes.put(node.id(), Optional.of(node));
states.put(node.id(), State.ACTIVE);
......@@ -135,7 +139,7 @@ public class DistributedClusterStore
@Override
public void memberAdded(MembershipEvent membershipEvent) {
log.info("Member {} added", membershipEvent.getMember());
ControllerNode node = addMember(membershipEvent.getMember());
ControllerNode node = addNode(node(membershipEvent.getMember()));
notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
}
......
......@@ -68,6 +68,11 @@ public class SimpleClusterStore
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
return null;
}
@Override
public void removeNode(NodeId nodeId) {
}
......
......@@ -9,6 +9,7 @@
<bundle>mvn:org.apache.commons/commons-lang3/3.3.2</bundle>
<bundle>mvn:com.google.guava/guava/18.0</bundle>
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:org.livetribe.slp/livetribe-slp-osgi/2.2.1</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
......