Yuta HIGUCHI

Stop sharing Roles in DistributedDeviceStore

Change-Id: Icd0302871c1d6f48379b93eb61f83bfa6df4ce20
......@@ -4,13 +4,15 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipServiceAdapter;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
......@@ -30,15 +32,20 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.impl.StoreManager;
import org.onlab.onos.store.impl.TestStoreManager;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
......@@ -65,6 +72,11 @@ public class DistributedDeviceManagerTest {
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private static final DefaultControllerNode SELF
= new DefaultControllerNode(new NodeId("foobar"),
IpPrefix.valueOf("127.0.0.1"));
private DeviceManager mgr;
protected StoreManager storeManager;
......@@ -75,6 +87,8 @@ public class DistributedDeviceManagerTest {
protected TestProvider provider;
protected TestListener listener = new TestListener();
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
@Before
public void setUp() {
......@@ -85,14 +99,18 @@ public class DistributedDeviceManagerTest {
// TODO should find a way to clean Hazelcast instance without shutdown.
Config config = TestStoreManager.getTestConfig();
masterManager = new TestMastershipManager();
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
dstore = new TestDistributedDeviceStore(storeManager);
dstore = new TestDistributedDeviceStore();
dstore.activate();
mgr.store = dstore;
mgr.eventDispatcher = new TestEventDispatcher();
mgr.mastershipService = new TestMastershipService();
eventService = new TestEventDispatcher();
mgr.eventDispatcher = eventService;
mgr.mastershipService = masterManager;
mgr.activate();
service.addListener(listener);
......@@ -272,13 +290,21 @@ public class DistributedDeviceManagerTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
public TestDistributedDeviceStore() {
this.storeService = storeManager;
}
}
private static class TestMastershipService extends MastershipServiceAdapter {
private static class TestMastershipManager extends MastershipServiceAdapter {
private ConcurrentMap<DeviceId, NodeId> masters = new ConcurrentHashMap<>();
public TestMastershipManager() {
// SELF master of all initially
masters.put(DID1, SELF.id());
masters.put(DID1, SELF.id());
}
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
......@@ -286,12 +312,27 @@ public class DistributedDeviceManagerTest {
@Override
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
return Sets.newHashSet(DID1, DID2);
HashSet<DeviceId> set = Sets.newHashSet();
for (Entry<DeviceId, NodeId> e : masters.entrySet()) {
if (e.getValue().equals(nodeId)) {
set.add(e.getKey());
}
}
return set;
}
@Override
public MastershipRole requestRoleFor(DeviceId deviceId) {
return MastershipRole.MASTER;
if (SELF.id().equals(masters.get(deviceId))) {
return MastershipRole.MASTER;
} else {
return MastershipRole.STANDBY;
}
}
@Override
public void relinquishMastership(DeviceId deviceId) {
masters.remove(deviceId, SELF.id());
}
}
}
......
......@@ -7,6 +7,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ISet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -15,7 +16,6 @@ import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceDescription;
......@@ -38,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -61,10 +60,6 @@ public class DistributedDeviceStore
private IMap<byte[], byte[]> rawDevices;
private LoadingCache<DeviceId, Optional<DefaultDevice>> devices;
// private IMap<DeviceId, MastershipRole> roles;
private IMap<byte[], byte[]> rawRoles;
private LoadingCache<DeviceId, Optional<MastershipRole>> roles;
// private ISet<DeviceId> availableDevices;
private ISet<byte[]> availableDevices;
......@@ -89,13 +84,6 @@ public class DistributedDeviceStore
// refresh/populate cache based on notification from other instance
rawDevices.addEntryListener(new RemoteEventHandler<>(devices), includeValue);
rawRoles = theInstance.getMap("roles");
final OptionalCacheLoader<DeviceId, MastershipRole> rolesLoader
= new OptionalCacheLoader<>(storeService, rawRoles);
roles = new AbsentInvalidatingLoadingCache<>(newBuilder().build(rolesLoader));
// refresh/populate cache based on notification from other instance
rawRoles.addEntryListener(new RemoteEventHandler<>(roles), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
......@@ -173,10 +161,6 @@ public class DistributedDeviceStore
devices.put(deviceId, Optional.of(device));
availableDevices.add(deviceIdBytes);
// For now claim the device as a master automatically.
//rawRoles.put(deviceIdBytes, serialize(MastershipRole.MASTER));
//roles.put(deviceId, Optional.of(MastershipRole.MASTER));
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
}
......@@ -350,8 +334,6 @@ public class DistributedDeviceStore
public DeviceEvent removeDevice(DeviceId deviceId) {
synchronized (this) {
byte[] deviceIdBytes = serialize(deviceId);
rawRoles.remove(deviceIdBytes);
roles.invalidate(deviceId);
// TODO conditional remove?
Device device = deserialize(rawDevices.remove(deviceIdBytes));
......