
Merge branch 'master' of ssh://

package org.onlab.onos.mastership;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.RoleInfo;
import org.onlab.onos.event.AbstractEvent;
......@@ -56,19 +55,6 @@ public class MastershipEvent extends AbstractEvent<MastershipEvent.Type, DeviceI
* Returns the NodeID of the node associated with the event.
* For MASTER_CHANGED this is the newly elected master, and for
* BACKUPS_CHANGED, this is the node that was newly added, removed, or
* whose position was changed in the list.
* @return node ID as a subject
//XXX to-be removed - or keep for convenience?
public NodeId node() {
return roleInfo.master();
* Returns the current role state for the subject.
* @return RoleInfo associated with Device ID subject
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.onos.cluster.NodeId;
public interface ClusterMessageResponse {
public interface ClusterMessageResponse extends Future<byte[]> {
public NodeId sender();
public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException;
public byte[] get(long timeout) throws InterruptedException;
// TODO InterruptedException, ExecutionException removed from original
// Future declaration. Revisit if we ever need those.
public byte[] get(long timeout, TimeUnit unit) throws TimeoutException;
......@@ -227,10 +227,14 @@ implements MastershipService, MastershipAdminService {
if (clusterService.getNodes().size() > (clusterSize.intValue() / 2)) {
return true;
//else {
// else {
//FIXME: break tie for equal-sized clusters, by number of
// connected switches, then masters, then nodeId hash
// }
// problem is, how do we get at channel info cleanly here?
// Also, what's the time hit for a distributed store look-up
// versus channel re-negotiation? bet on the latter being worse.
// }
return false;
......@@ -4,9 +4,9 @@ import static;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -181,10 +181,13 @@ public class ClusterCommunicationManager
private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
private static final class InternalClusterMessageResponse
implements ClusterMessageResponse {
private final NodeId sender;
private final Response responseFuture;
private volatile boolean isCancelled = false;
private volatile boolean isDone = false;
public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
this.sender = sender;
......@@ -198,12 +201,39 @@ public class ClusterCommunicationManager
public byte[] get(long timeout, TimeUnit timeunit)
throws TimeoutException {
return responseFuture.get(timeout, timeunit);
final byte[] result = responseFuture.get(timeout, timeunit);
isDone = true;
return result;
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
// doing nothing for now
// when onlab.netty Response support cancel, call them.
isCancelled = true;
return true;
public boolean isCancelled() {
return isCancelled;
public boolean isDone() {
return this.isDone || isCancelled();
public byte[] get(long timeout) throws InterruptedException {
return responseFuture.get();
public byte[] get() throws InterruptedException, ExecutionException {
// TODO: consider forbidding this call and force the use of timed get
// to enforce handling of remote peer failure scenario
final byte[] result = responseFuture.get();
isDone = true;
return result;
......@@ -86,7 +86,7 @@ public class ReplicaInfoManager implements ReplicaInfoService {
final List<NodeId> standbyList = Collections.<NodeId>emptyList(); ReplicaInfoEvent(MASTER_CHANGED,
new ReplicaInfo(event.node(), standbyList)));
new ReplicaInfo(event.roleInfo().master(), standbyList)));
......@@ -4,6 +4,7 @@ import static
import static org.slf4j.LoggerFactory.getLogger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -26,9 +27,9 @@ import;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.Map;
......@@ -70,7 +71,9 @@ public class DistributedStatisticStore implements StatisticStore {
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
protected void setupKryoPool() {
serializerPool = KryoNamespaces.API.newBuilder()
serializerPool = KryoNamespace.newBuilder()
// register this store specific classes here
......@@ -91,23 +91,14 @@ implements MastershipStore {
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
NodeId current = getNode(MASTER, deviceId);
if (current == null) {
if (isRole(STANDBY, nodeId, deviceId)) {
//was previously standby, or set to standby from master
return MastershipRole.STANDBY;
} else {
return MastershipRole.NONE;
} else {
if (current.equals(nodeId)) {
//*should* be in unusable, not always
return MastershipRole.MASTER;
} else {
//may be in backups or unusable from earlier retirement
return MastershipRole.STANDBY;
final RoleValue roleInfo = getRoleValue(deviceId);
if (roleInfo.contains(MASTER, nodeId)) {
return MASTER;
if (roleInfo.contains(STANDBY, nodeId)) {
return STANDBY;
return NONE;
......@@ -124,10 +115,11 @@ implements MastershipStore {
roleMap.put(deviceId, rv);
return null;
case NONE:
NodeId current = rv.get(MASTER);
if (current != null) {
//backup and replace current master
rv.reassign(nodeId, NONE, STANDBY);
rv.reassign(current, NONE, STANDBY);
rv.replace(current, nodeId, MASTER);
} else {
//no master before so just add.
......@@ -137,12 +129,6 @@ implements MastershipStore {
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
case NONE:
rv.add(MASTER, nodeId);
rv.reassign(nodeId, STANDBY, NONE);
roleMap.put(deviceId, rv);
return new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo());
log.warn("unknown Mastership Role {}", role);
return null;
......@@ -193,21 +179,28 @@ implements MastershipStore {
switch (role) {
case MASTER:
rv.reassign(local, STANDBY, NONE);
terms.putIfAbsent(deviceId, INIT);
roleMap.put(deviceId, rv);
rv.reassign(local, NONE, STANDBY);
roleMap.put(deviceId, rv);
terms.putIfAbsent(deviceId, INIT);
case NONE:
//claim mastership
//either we're the first standby, or first to device.
//for latter, claim mastership.
if (rv.get(MASTER) == null) {
rv.add(MASTER, local);
rv.reassign(local, STANDBY, NONE);
roleMap.put(deviceId, rv);
role = MastershipRole.MASTER;
} else {
rv.add(STANDBY, local);
rv.reassign(local, NONE, STANDBY);
role = MastershipRole.STANDBY;
roleMap.put(deviceId, rv);
log.warn("unknown Mastership Role {}", role);
......@@ -315,7 +308,10 @@ implements MastershipStore {
RoleValue value = roleMap.get(deviceId);
if (value == null) {
value = new RoleValue();
roleMap.put(deviceId, value);
RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
if (concurrentlyAdded != null) {
return concurrentlyAdded;
return value;
......@@ -329,16 +325,6 @@ implements MastershipStore {
return null;
//check if node is a certain role given a device
private boolean isRole(
MastershipRole role, NodeId nodeId, DeviceId deviceId) {
RoleValue value = roleMap.get(deviceId);
if (value != null) {
return value.contains(role, nodeId);
return false;
//adds or updates term information.
private void updateTerm(DeviceId deviceId) {
......@@ -97,6 +97,7 @@ public class DistributedMastershipStoreTest {
assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
testStore.put(DID1, N1, true, false, true);
assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
testStore.put(DID1, N2, false, true, false);
assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
......@@ -155,6 +156,7 @@ public class DistributedMastershipStoreTest {
//switch over to N2
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
System.out.println(dms.getTermFor(DID1).master() + ":" + dms.getTermFor(DID1).termNumber());
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
//orphan switch - should be rare case
......@@ -182,14 +184,9 @@ public class DistributedMastershipStoreTest {
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.relinquishRole(N1, DID1).type());
assertEquals("wrong master", N2, dms.getMaster(DID1));
//STANDBY - nothing here, either
assertNull("wrong event:", dms.relinquishRole(N1, DID1));
assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
//all nodes "give up" on device, which goes back to NONE.
assertNull("wrong event:", dms.relinquishRole(N2, DID1));
assertEquals("wrong role for node:", NONE, dms.getRole(N2, DID1));
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
assertEquals("wrong number of retired nodes", 2,
......@@ -201,6 +198,10 @@ public class DistributedMastershipStoreTest {
assertEquals("wrong number of backup nodes", 1,
//If STANDBY, should drop to NONE
assertNull("wrong event:", dms.relinquishRole(N1, DID1));
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID1));
//NONE - nothing happens
assertNull("wrong event:", dms.relinquishRole(N1, DID2));
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
......@@ -218,7 +219,7 @@ public class DistributedMastershipStoreTest {
public void notify(MastershipEvent event) {
assertEquals("wrong event:", Type.MASTER_CHANGED, event.type());
assertEquals("wrong subject", DID1, event.subject());
assertEquals("wrong subject", N1, event.node());
assertEquals("wrong subject", N1, event.roleInfo().master());
......@@ -207,7 +207,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
+ "value for dpid: {}", dpid);
return false;
} else {
log.error("Added switch {}", dpid);"Added switch {}", dpid);
connectedSwitches.put(dpid, sw);
for (OpenFlowSwitchListener l : ofSwitchListener) {
......@@ -610,7 +610,8 @@
......@@ -16,58 +16,95 @@ from functools import partial
import time
from sys import argv
from time import sleep
from sets import Set
class ONOS( Controller ):
#def __init__( self, name, command='/opt/onos/bin/onos-service', **kwargs ):
# Controller.__init__( self, name, command=command, inNamespace=True, **kwargs )
#def __init__( self, name, inNamespace=False, command='controller',
# cargs='-v ptcp:%d', cdir=None, ip="",
# port=6633, protocol='tcp', **params ):
#self.command = command
#self.cargs = cargs
#self.cdir = cdir
#self.ip = ip
#self.port = port
#self.protocol = protocol
#Node.__init__( self, name, inNamespace=inNamespace,
# ip=ip, **params )
onosDir = '/opt/onos/'
def __init__( self, name, onosDir=onosDir,
reactive=True, features=[ 'onos-app-tvue' ],
**kwargs ):
Controller.__init__( self, name, **kwargs )
# the following have been done for us:
#self.ip = ip ('')
#self.port = port (6633)
#self.protocol = protocol ('tcp')
ONOS_DIR = '/opt/onos/'
KARAF_DIR = ONOS_DIR + 'apache-karaf-3.0.1/'
reactive = True
self.onosDir = onosDir
self.karafDir = onosDir + 'apache-karaf-3.0.1/'
self.instanceDir = self.karafDir
# add default modules
# TODO: consider an ordered set
self.features = Set([ 'webconsole',
'onos-openflow' ])
self.features.update( features )
# add reactive forwarding modules
if reactive:
self.features.update( ['onos-app-fwd',
'onos-app-mobility' ] )
# add the distributed core if we are in a namespace with no trivial core
if self.inNamespace and 'onos-core-trivial' not in self.features:
self.features.add( 'onos-core' )
# if there is no core, add the trivial one
if 'onos-core' not in self.features:
self.features.add( 'onos-core-trivial' )
print self.features
def start( self ):
# switch to the non-root user because karaf gets upset otherwise
# TODO we should look into why....
self.sendCmd( 'sudo su - %s' % self.findUser() )
self.waiting = False
if self.inNamespace:
self.cmd( self.KARAF_DIR + 'bin/instance create %s' % )
src = self.KARAF_DIR + 'etc/org.apache.karaf.features.cfg'
dst = self.KARAF_DIR + 'instances/%s/etc/org.apache.karaf.features.cfg' %
self.cmd( 'cp %s %s' % (src, dst) )
self.updateProperties( dst )
self.cmd( self.KARAF_DIR + 'bin/instance start %s' % )
instanceOpts = ( '-furl mvn:org.onlab.onos/onos-features/1.0.0-SNAPSHOT/xml/features '
'-s 8101' )
self.userCmd( self.karafDir + 'bin/instance create %s %s' % ( instanceOpts, ) )
self.instanceDir = self.karafDir + 'instances/%s/' %
# we are running in the root namespace, so let's use the root instance
self.cmd( 'rm -rf '+ self.KARAF_DIR + 'data/' )
filename = self.KARAF_DIR + 'etc/org.apache.karaf.features.cfg'
self.updateProperties( filename )
self.cmd( self.KARAF_DIR + 'bin/start' )
# clean up the data directory
#self.userCmd( 'rm -rf '+ self.karafDir + 'data/' )
self.userCmd( 'rm -rf '+ self.instanceDir + 'data/' )
# Update etc/org.apache.karaf.features.cfg
# TODO 2. Update etc/hazelcast.xml : interface lines
#cp etc/hazelcast.xml instances/c1/etc/
# TODO 3. Update etc/ : onos.ip
# TODO 4. Update config/cluster.json : with all nodes
# start onos
self.userCmd( self.instanceDir + 'bin/start' )
#TODO we should wait for startup...
def stop( self ):
if self.inNamespace:
self.cmd( '/opt/onos/apache-karaf-3.0.1/bin/instance stop %s' % )
self.cmd( '/opt/onos/apache-karaf-3.0.1/bin/instance destroy %s' % )
self.cmd( self.ONOS_DIR + 'apache-karaf-3.0.1/bin/stop' )
self.userCmd( self.instanceDir + 'bin/stop' )
#if self.inNamespace:
# self.userCmd( self.karafDir + 'bin/instance destroy %s' % )
def updateProperties( self, filename ):
def updateHazelcast( self ):
readfile = self.karafDir + 'etc/hazelcast.xml'
writefile = self.instanceDir + 'etc/hazelcast.xml'
with open( readfile, 'r' ) as r:
with open( writefile, 'w' ) as w:
for line in r.readlines():
if '<interface>' in line:
line = '<interface>' + '192.168.123.*' + '</interface>\n'
w.write( line )
def updateFeatures( self ):
filename = self.instanceDir + 'etc/org.apache.karaf.features.cfg'
with open( filename, 'r+' ) as f:
lines = f.readlines()
......@@ -75,17 +112,25 @@ class ONOS( Controller ):
for line in lines:
#print '?', line,
if 'featuresBoot=' in line:
line = line.rstrip()
#print ord(line[-1]), ord(line[-2]), ord(line[-3])
if self.reactive:
line += ',onos-app-fwd'
line += '\n'
# parse the features from the line
features = line.rstrip().split('=')[1].split(',')
# add the features to our features set
self.features.update( features )
# generate the new features line
line = 'featuresBoot=' + ','.join( self.features ) + '\n'
#print '!', line,
f.write( line )
def isAvailable( self ):
return quietRun( 'ls /opt/onos' )
return quietRun( 'ls %s' % self.onosDir )
def userCmd( self, cmd ):
# switch to the non-root user because karaf gets upset otherwise
# because the .m2repo is not stored with root
cmd = 'sudo -u %s %s' % ( self.findUser(), cmd )
return self.cmd( cmd )
def findUser():
......@@ -111,7 +156,7 @@ class ControlNetwork( Topo ):
# Connect everything to a single switch
cs0 = self.addSwitch( 'cs0' )
# Add hosts which will serve as data network controllers
for i in range( 0, n ):
for i in range( 1, n+1 ):
c = self.addHost( 'c%s' % i, cls=dataController,
inNamespace=True )
self.addLink( c, cs0 )
......@@ -122,7 +167,7 @@ class ControlNetwork( Topo ):
class ONOSCluster( Controller ):
n = 4
n = 3
def start( self ):
ctopo = ControlNetwork( n=self.n, dataController=ONOS )
......@@ -137,6 +182,9 @@ class ONOSCluster( Controller ):
def stop( self ):
for host in self.cnet.hosts:
if isinstance( host, Controller ):
def clist( self ):
......@@ -158,10 +206,11 @@ switches = { 'ovso': OVSSwitchONOS }
if __name__ == '__main__':
# Simple test for ONOS() controller class
setLogLevel( 'info' )
setLogLevel( 'info' ) #TODO info
size = 2 if len( argv ) != 2 else int( argv[ 1 ] )
net = Mininet( topo=LinearTopo( size ),
controller=partial( ONOSCluster, n=4 ),
controller=partial( ONOSCluster, n=3 ), #TODO
switch=OVSSwitchONOS )
#waitConnected( net.switches )
......@@ -26,7 +26,6 @@