Ayaka Koshibe

tests for DistributedMastershipStore

Change-Id: Ic7daa333ac7d7947155b745daf08e4771f1189ef
......@@ -200,7 +200,7 @@ public class DeviceManager
// process.
if (event != null) {
log.info("Device {} connected", deviceId);
mastershipService.requestRoleFor(deviceId);
//mastershipService.requestRoleFor(deviceId);
provider().roleChanged(event.subject(),
mastershipService.requestRoleFor(deviceId));
post(event);
......
......@@ -4,7 +4,6 @@ import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.cluster.MastershipEvent.Type.MASTER_CHANGED;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
......@@ -28,6 +27,7 @@ import org.onlab.onos.store.common.OptionalCacheLoader;
import com.google.common.base.Optional;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
/**
......@@ -39,8 +39,22 @@ public class DistributedMastershipStore
extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
implements MastershipStore {
private IMap<byte[], byte[]> rawMasters;
private LoadingCache<DeviceId, Optional<NodeId>> masters;
//arbitrary lock name
private static final String LOCK = "lock";
//initial term value
private static final Integer INIT = 0;
//placeholder non-null value
private static final Byte NIL = 0x0;
//devices to masters
protected IMap<byte[], byte[]> rawMasters;
//devices to terms
protected IMap<byte[], Integer> rawTerms;
//collection of nodes. values are ignored, as it's used as a makeshift 'set'
protected IMap<byte[], Byte> backups;
//TODO - remove
//private LoadingCache<DeviceId, Optional<NodeId>> masters;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -51,23 +65,18 @@ implements MastershipStore {
super.activate();
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
rawTerms = theInstance.getMap("terms");
backups = theInstance.getMap("backups");
loadMasters();
//TODO: hook up maps to event notification
//OptionalCacheLoader<DeviceId, NodeId> nodeLoader
//= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
//masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
//rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
log.info("Started");
}
private void loadMasters() {
for (byte[] keyBytes : rawMasters.keySet()) {
final DeviceId id = deserialize(keyBytes);
masters.refresh(id);
}
}
@Deactivate
public void deactivate() {
log.info("Stopped");
......@@ -75,60 +84,178 @@ implements MastershipStore {
@Override
public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
synchronized (this) {
NodeId currentMaster = getMaster(deviceId);
if (Objects.equals(currentMaster, nodeId)) {
return null;
}
byte [] did = serialize(deviceId);
byte [] nid = serialize(nodeId);
// FIXME: for now implementing semantics of setMaster
rawMasters.put(serialize(deviceId), serialize(nodeId));
masters.put(deviceId, Optional.of(nodeId));
return new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, nodeId);
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(nodeId, deviceId);
Integer term = rawTerms.get(did);
switch (role) {
case MASTER:
return null;
case STANDBY:
rawMasters.put(did, nid);
rawTerms.put(did, ++term);
backups.putIfAbsent(nid, NIL);
break;
case NONE:
rawMasters.put(did, nid);
//new switch OR state transition after being orphaned
if (term == null) {
rawTerms.put(did, INIT);
} else {
rawTerms.put(did, ++term);
}
backups.put(nid, NIL);
break;
default:
log.warn("unknown Mastership Role {}", role);
return null;
}
return new MastershipEvent(MASTER_CHANGED, deviceId, nodeId);
} finally {
lock.unlock();
}
}
@Override
public NodeId getMaster(DeviceId deviceId) {
return masters.getUnchecked(deviceId).orNull();
return deserialize(rawMasters.get(serialize(deviceId)));
}
@Override
public Set<DeviceId> getDevices(NodeId nodeId) {
ImmutableSet.Builder<DeviceId> builder = ImmutableSet.builder();
for (Map.Entry<DeviceId, Optional<NodeId>> entry : masters.asMap().entrySet()) {
if (nodeId.equals(entry.getValue().get())) {
builder.add(entry.getKey());
for (Map.Entry<byte[], byte[]> entry : rawMasters.entrySet()) {
if (nodeId.equals(deserialize(entry.getValue()))) {
builder.add((DeviceId) deserialize(entry.getKey()));
}
}
return builder.build();
}
@Override
public MastershipRole requestRole(DeviceId deviceId) {
// FIXME: for now we are 'selecting' as master whoever asks
setMaster(clusterService.getLocalNode().id(), deviceId);
return MastershipRole.MASTER;
// first to empty slot for device in master map is MASTER
// depending on how backups are organized, might need to trigger election
// so only controller doesn't set itself to backup for another device
byte [] did = serialize(deviceId);
NodeId local = clusterService.getLocalNode().id();
byte [] lnid = serialize(local);
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(local, deviceId);
switch (role) {
case MASTER:
break;
case STANDBY:
backups.put(lnid, NIL);
rawTerms.putIfAbsent(did, INIT);
break;
case NONE:
rawMasters.put(did, lnid);
rawTerms.putIfAbsent(did, INIT);
backups.put(lnid, NIL);
role = MastershipRole.MASTER;
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return role;
} finally {
lock.unlock();
}
}
@Override
public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
NodeId master = masters.getUnchecked(deviceId).orNull();
return nodeId.equals(master) ? MastershipRole.MASTER : MastershipRole.STANDBY;
byte[] did = serialize(deviceId);
NodeId current = deserialize(rawMasters.get(did));
MastershipRole role = null;
if (current == null) {
//IFF no controllers have claimed mastership over it
role = MastershipRole.NONE;
} else {
if (current.equals(nodeId)) {
role = MastershipRole.MASTER;
} else {
role = MastershipRole.STANDBY;
}
}
return role;
}
@Override
public MastershipTerm getTermFor(DeviceId deviceId) {
// TODO Auto-generated method stub
return null;
byte[] did = serialize(deviceId);
if ((rawMasters.get(did) == null) ||
(rawTerms.get(did) == null)) {
return null;
}
return MastershipTerm.of(
(NodeId) deserialize(rawMasters.get(did)), rawTerms.get(did));
}
@Override
public MastershipEvent unsetMaster(NodeId nodeId, DeviceId deviceId) {
// TODO Auto-generated method stub
byte [] did = serialize(deviceId);
ILock lock = theInstance.getLock(LOCK);
lock.lock();
try {
MastershipRole role = getRole(nodeId, deviceId);
switch (role) {
case MASTER:
//hand off device to another
NodeId backup = reelect(nodeId, deviceId);
if (backup == null) {
//goes back to NONE
rawMasters.remove(did);
} else {
//goes to STANDBY for local, MASTER for someone else
Integer term = rawTerms.get(did);
rawMasters.put(did, serialize(backup));
rawTerms.put(did, ++term);
return new MastershipEvent(MASTER_CHANGED, deviceId, backup);
}
case STANDBY:
case NONE:
break;
default:
log.warn("unknown Mastership Role {}", role);
}
return null;
} finally {
lock.unlock();
}
}
//helper for "re-electing" a new master for a given device
private NodeId reelect(NodeId current, DeviceId deviceId) {
for (byte [] node : backups.keySet()) {
NodeId nid = deserialize(node);
if (!current.equals(nid)) {
return nid;
}
}
return null;
}
//adds node to pool(s) of backup
private void backup(NodeId nodeId, DeviceId deviceId) {
//TODO might be useful to isolate out
}
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
......
package org.onlab.onos.store.cluster.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.onlab.onos.net.MastershipRole.*;
import java.util.Set;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.onos.cluster.ClusterEventListener;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.ControllerNode.State;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipEvent.Type;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast-based distributed MastershipStore implementation.
*/
public class DistributedMastershipStoreTest {
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
private static final DeviceId DID4 = DeviceId.deviceId("of:04");
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
private static final NodeId N1 = new NodeId("node1");
private static final NodeId N2 = new NodeId("node2");
private static final ControllerNode CN1 = new DefaultControllerNode(N1, IP);
private static final ControllerNode CN2 = new DefaultControllerNode(N2, IP);
private DistributedMastershipStore dms;
private TestDistributedMastershipStore testStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeMgr;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Before
public void setUp() throws Exception {
// TODO should find a way to clean Hazelcast instance without shutdown.
Config config = TestStoreManager.getTestConfig();
storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeMgr.activate();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
dms.clusterService = new TestClusterService();
dms.activate();
testStore = (TestDistributedMastershipStore) dms;
}
@After
public void tearDown() throws Exception {
dms.deactivate();
serializationMgr.deactivate();
storeMgr.deactivate();
}
@Test
public void getRole() {
assertEquals("wrong role:", NONE, dms.getRole(N1, DID1));
testStore.put(DID1, N1, true, true, true);
assertEquals("wrong role:", MASTER, dms.getRole(N1, DID1));
assertEquals("wrong role:", STANDBY, dms.getRole(N2, DID1));
}
@Test
public void getMaster() {
assertTrue("wrong store state:", dms.rawMasters.isEmpty());
testStore.put(DID1, N1, true, false, false);
assertEquals("wrong master:", N1, dms.getMaster(DID1));
assertNull("wrong master:", dms.getMaster(DID2));
}
@Test
public void getDevices() {
assertTrue("wrong store state:", dms.rawMasters.isEmpty());
testStore.put(DID1, N1, true, false, false);
testStore.put(DID2, N1, true, false, false);
testStore.put(DID3, N2, true, false, false);
assertEquals("wrong devices",
Sets.newHashSet(DID1, DID2), dms.getDevices(N1));
}
@Test
public void requestRoleAndTerm() {
//CN1 is "local"
testStore.setCurrent(CN1);
//if already MASTER, nothing should happen
testStore.put(DID2, N1, true, false, false);
assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
assertTrue("wrong state for store:",
dms.backups.isEmpty() & dms.rawTerms.isEmpty());
//populate maps with DID1, N1 thru NONE case
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
assertTrue("wrong state for store:",
!dms.backups.isEmpty() & !dms.rawTerms.isEmpty());
assertEquals("wrong term",
MastershipTerm.of(N1, 0), dms.getTermFor(DID1));
//CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
testStore.setCurrent(CN2);
assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
assertEquals("wrong number of entries:", 2, dms.rawTerms.size());
//change term and requestRole() again; should persist
testStore.increment(DID2);
assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
}
@Test
public void setMaster() {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
assertNull("wrong event:", dms.setMaster(N1, DID1));
//switch over to N2
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID1).type());
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID1));
//orphan switch - should be rare case
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.setMaster(N2, DID2).type());
assertEquals("wrong term", MastershipTerm.of(N2, 0), dms.getTermFor(DID2));
//disconnect and reconnect - sign of failing re-election or single-instance channel
testStore.reset(true, false, false);
dms.setMaster(N2, DID2);
assertEquals("wrong term", MastershipTerm.of(N2, 1), dms.getTermFor(DID2));
}
@Test
public void unsetMaster() {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
//no backup, no new MASTER/event
assertNull("wrong event:", dms.unsetMaster(N1, DID1));
//add backup CN2, get it elected MASTER
dms.requestRole(DID1);
testStore.setCurrent(CN2);
dms.requestRole(DID1);
assertEquals("wrong event:", Type.MASTER_CHANGED, dms.unsetMaster(N1, DID1).type());
assertEquals("wrong master", N2, dms.getMaster(DID1));
//STANDBY - nothing here, either
assertNull("wrong event:", dms.unsetMaster(N1, DID1));
assertEquals("wrong role for node:", STANDBY, dms.getRole(N1, DID1));
//NONE - nothing happens
assertNull("wrong event:", dms.unsetMaster(N1, DID2));
assertEquals("wrong role for node:", NONE, dms.getRole(N1, DID2));
}
private class TestDistributedMastershipStore extends
DistributedMastershipStore {
public TestDistributedMastershipStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
//helper to populate master/backup structures
public void put(DeviceId dev, NodeId node,
boolean store, boolean backup, boolean term) {
if (store) {
dms.rawMasters.put(serialize(dev), serialize(node));
}
if (backup) {
dms.backups.put(serialize(node), (byte) 0);
}
if (term) {
dms.rawTerms.put(serialize(dev), 0);
}
}
//clears structures
public void reset(boolean store, boolean backup, boolean term) {
if (store) {
dms.rawMasters.clear();
}
if (backup) {
dms.backups.clear();
}
if (term) {
dms.rawTerms.clear();
}
}
//increment term for a device
public void increment(DeviceId dev) {
Integer t = dms.rawTerms.get(serialize(dev));
if (t != null) {
dms.rawTerms.put(serialize(dev), ++t);
}
}
//sets the "local" node
public void setCurrent(ControllerNode node) {
((TestClusterService) clusterService).current = node;
}
}
private class TestClusterService implements ClusterService {
protected ControllerNode current;
@Override
public ControllerNode getLocalNode() {
return current;
}
@Override
public Set<ControllerNode> getNodes() {
return Sets.newHashSet(CN1, CN2);
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return null;
}
@Override
public State getState(NodeId nodeId) {
return null;
}
@Override
public void addListener(ClusterEventListener listener) {
}
@Override
public void removeListener(ClusterEventListener listener) {
}
}
}