tom

Merge remote-tracking branch 'origin/master'

......@@ -44,6 +44,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-store</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
......
......@@ -4,13 +4,15 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipServiceAdapter;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
......@@ -30,23 +32,26 @@ import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.impl.StoreManager;
import org.onlab.onos.store.impl.TestStoreManager;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.Assert.*;
import static org.onlab.onos.net.Device.Type.SWITCH;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
// FIXME This test is painfully slow starting up Hazelcast on each test cases,
// turning it off in repository for now.
// FIXME This test is slow starting up Hazelcast on each test cases.
// FIXME DistributedDeviceStore should have it's own test cases.
/**
......@@ -67,6 +72,11 @@ public class DistributedDeviceManagerTest {
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private static final DefaultControllerNode SELF
= new DefaultControllerNode(new NodeId("foobar"),
IpPrefix.valueOf("127.0.0.1"));
private DeviceManager mgr;
protected StoreManager storeManager;
......@@ -77,6 +87,8 @@ public class DistributedDeviceManagerTest {
protected TestProvider provider;
protected TestListener listener = new TestListener();
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
@Before
public void setUp() {
......@@ -84,26 +96,21 @@ public class DistributedDeviceManagerTest {
service = mgr;
admin = mgr;
registry = mgr;
// FIXME should be reading the hazelcast.xml
Config config = new Config();
// avoid accidentally joining other cluster
config.getGroupConfig().setName(UUID.randomUUID().toString());
// quickly form single node cluster
config.getNetworkConfig().getJoin()
.getTcpIpConfig()
.setEnabled(true).setConnectionTimeoutSeconds(0);
config.getNetworkConfig().getJoin()
.getMulticastConfig()
.setEnabled(false);
// TODO should find a way to clean Hazelcast instance without shutdown.
Config config = TestStoreManager.getTestConfig();
masterManager = new TestMastershipManager();
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
dstore = new TestDistributedDeviceStore(storeManager);
dstore = new TestDistributedDeviceStore();
dstore.activate();
mgr.store = dstore;
mgr.eventDispatcher = new TestEventDispatcher();
mgr.mastershipService = new TestMastershipService();
eventService = new TestEventDispatcher();
mgr.eventDispatcher = eventService;
mgr.mastershipService = masterManager;
mgr.activate();
service.addListener(listener);
......@@ -283,23 +290,21 @@ public class DistributedDeviceManagerTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
public TestDistributedDeviceStore() {
this.storeService = storeManager;
}
}
private class TestStoreManager extends StoreManager {
TestStoreManager(HazelcastInstance instance) {
this.instance = instance;
}
private static class TestMastershipManager extends MastershipServiceAdapter {
@Override
public void activate() {
setupKryoPool();
}
}
private ConcurrentMap<DeviceId, NodeId> masters = new ConcurrentHashMap<>();
private static class TestMastershipService extends MastershipServiceAdapter {
public TestMastershipManager() {
// SELF master of all initially
masters.put(DID1, SELF.id());
masters.put(DID1, SELF.id());
}
@Override
public MastershipRole getLocalRole(DeviceId deviceId) {
return MastershipRole.MASTER;
......@@ -307,13 +312,27 @@ public class DistributedDeviceManagerTest {
@Override
public Set<DeviceId> getDevicesOf(NodeId nodeId) {
return Sets.newHashSet(DID1, DID2);
HashSet<DeviceId> set = Sets.newHashSet();
for (Entry<DeviceId, NodeId> e : masters.entrySet()) {
if (e.getValue().equals(nodeId)) {
set.add(e.getKey());
}
}
return set;
}
@Override
public MastershipRole requestRoleFor(DeviceId deviceId) {
return MastershipRole.MASTER;
if (SELF.id().equals(masters.get(deviceId))) {
return MastershipRole.MASTER;
} else {
return MastershipRole.STANDBY;
}
}
}
@Override
public void relinquishMastership(DeviceId deviceId) {
masters.remove(deviceId, SELF.id());
}
}
}
......
......@@ -7,6 +7,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ISet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -15,7 +16,6 @@ import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceDescription;
......@@ -38,7 +38,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -61,10 +60,6 @@ public class DistributedDeviceStore
private IMap<byte[], byte[]> rawDevices;
private LoadingCache<DeviceId, Optional<DefaultDevice>> devices;
// private IMap<DeviceId, MastershipRole> roles;
private IMap<byte[], byte[]> rawRoles;
private LoadingCache<DeviceId, Optional<MastershipRole>> roles;
// private ISet<DeviceId> availableDevices;
private ISet<byte[]> availableDevices;
......@@ -73,6 +68,7 @@ public class DistributedDeviceStore
private IMap<byte[], byte[]> rawDevicePorts;
private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts;
@Override
@Activate
public void activate() {
super.activate();
......@@ -88,13 +84,6 @@ public class DistributedDeviceStore
// refresh/populate cache based on notification from other instance
rawDevices.addEntryListener(new RemoteEventHandler<>(devices), includeValue);
rawRoles = theInstance.getMap("roles");
final OptionalCacheLoader<DeviceId, MastershipRole> rolesLoader
= new OptionalCacheLoader<>(storeService, rawRoles);
roles = new AbsentInvalidatingLoadingCache<>(newBuilder().build(rolesLoader));
// refresh/populate cache based on notification from other instance
rawRoles.addEntryListener(new RemoteEventHandler<>(roles), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
......@@ -110,6 +99,7 @@ public class DistributedDeviceStore
@Deactivate
public void deactivate() {
log.info("Stopped");
}
......@@ -171,10 +161,6 @@ public class DistributedDeviceStore
devices.put(deviceId, Optional.of(device));
availableDevices.add(deviceIdBytes);
// For now claim the device as a master automatically.
//rawRoles.put(deviceIdBytes, serialize(MastershipRole.MASTER));
//roles.put(deviceId, Optional.of(MastershipRole.MASTER));
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
}
......@@ -348,8 +334,6 @@ public class DistributedDeviceStore
public DeviceEvent removeDevice(DeviceId deviceId) {
synchronized (this) {
byte[] deviceIdBytes = serialize(deviceId);
rawRoles.remove(deviceIdBytes);
roles.invalidate(deviceId);
// TODO conditional remove?
Device device = deserialize(rawDevices.remove(deviceIdBytes));
......@@ -360,5 +344,4 @@ public class DistributedDeviceStore
}
// TODO cache serialized DeviceID if we suffer from serialization cost
}
......
......@@ -6,6 +6,7 @@ import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
......@@ -86,8 +87,12 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
cache.put(storeService.<K>deserialize(event.getKey()),
Optional.of(storeService.<V>deserialize(event.getValue())));
K key = storeService.<K>deserialize(event.getKey());
final V oldVal = storeService.<V>deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
final V newVal = storeService.<V>deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().replace(key, oldValue, newValue);
}
@Override
......@@ -97,7 +102,10 @@ public abstract class AbstractDistributedStore<E extends Event, D extends StoreD
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
entryUpdated(event);
K key = storeService.<K>deserialize(event.getKey());
final V newVal = storeService.<V>deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().putIfAbsent(key, newValue);
}
}
......
......@@ -45,7 +45,7 @@ import java.util.HashMap;
@Service
public class StoreManager implements StoreService {
private static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
private final Logger log = LoggerFactory.getLogger(getClass());
......
package org.onlab.onos.store.impl;
import java.io.FileNotFoundException;
import java.util.UUID;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.HazelcastInstance;
/**
* Dummy StoreManager to use specified Hazelcast instance.
*/
public class TestStoreManager extends StoreManager {
/**
* Gets the Hazelcast Config for testing.
*
* @return
*/
public static Config getTestConfig() {
Config config;
try {
config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
} catch (FileNotFoundException e) {
// falling back to default
config = new Config();
}
// avoid accidentally joining other cluster
config.getGroupConfig().setName(UUID.randomUUID().toString());
// quickly form single node cluster
config.getNetworkConfig().getJoin()
.getTcpIpConfig()
.setEnabled(true).setConnectionTimeoutSeconds(0);
config.getNetworkConfig().getJoin()
.getMulticastConfig()
.setEnabled(false);
return config;
}
/**
* Constructor.
*
* @param instance Hazelast instance to return on #getHazelcastInstance()
*/
public TestStoreManager(HazelcastInstance instance) {
this.instance = instance;
}
// Hazelcast setup removed from original code.
@Override
public void activate() {
setupKryoPool();
}
}
......@@ -16,6 +16,18 @@
<description>ONOS OpenFlow controller subsystem API</description>
<repositories>
<!-- FIXME: for Loxigen. Decide how to use Loxigen before release. -->
<repository>
<id>sonatype-oss-snapshot</id>
<name>Sonatype OSS snapshot repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.projectfloodlight</groupId>
......