Yuta HIGUCHI

Experimenting multi Provider support on SimpelDeviceStore.

Change-Id: I181db7704556768863624f072540d141e39d0904
......@@ -2,6 +2,7 @@ package org.onlab.onos.store.cluster.impl;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
......@@ -58,6 +59,7 @@ public class ClusterCommunicationManagerTest {
ccm2.deactivate();
}
@Ignore("FIXME: failing randomly?")
@Test
public void connect() throws Exception {
cnd1.latch = new CountDownLatch(1);
......
......@@ -25,6 +25,10 @@
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -3,6 +3,8 @@ package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -10,6 +12,7 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Device.Type;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
......@@ -23,21 +26,27 @@ import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
// TODO: synchronization should be done in more fine-grained manner.
/**
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
......@@ -52,9 +61,18 @@ public class SimpleDeviceStore
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
private final Map<DeviceId, DefaultDevice> devices = new ConcurrentHashMap<>();
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
deviceDescs = new ConcurrentHashMap<>();
// cache of Device and Ports generated by compositing descriptions from providers
private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
// available(=UP) devices
private final Set<DeviceId> availableDevices = new HashSet<>();
private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = new HashMap<>();
@Activate
public void activate() {
......@@ -73,7 +91,7 @@ public class SimpleDeviceStore
@Override
public Iterable<Device> getDevices() {
return Collections.unmodifiableSet(new HashSet<Device>(devices.values()));
return Collections.unmodifiableCollection(devices.values());
}
@Override
......@@ -82,82 +100,115 @@ public class SimpleDeviceStore
}
@Override
public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription) {
DefaultDevice device = devices.get(deviceId);
if (device == null) {
return createDevice(providerId, deviceId, deviceDescription);
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= createIfAbsentUnchecked(deviceDescs, deviceId,
new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
Device oldDevice = devices.get(deviceId);
DeviceDescriptions descs
= createIfAbsentUnchecked(providerDescs, providerId,
new InitDeviceDescs(deviceDescription));
descs.putDeviceDesc(deviceDescription);
Device newDevice = composeDevice(deviceId, providerDescs);
if (oldDevice == null) {
// ADD
return createDevice(providerId, newDevice);
} else {
// UPDATE or ignore (no change or stale)
return updateDevice(providerId, oldDevice, newDevice);
}
return updateDevice(providerId, device, deviceDescription);
}
// Creates the device and returns the appropriate event if necessary.
private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription desc) {
DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(),
desc.manufacturer(),
desc.hwVersion(), desc.swVersion(),
desc.serialNumber());
private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
// update composed device cache
synchronized (this) {
devices.put(deviceId, device);
availableDevices.add(deviceId);
devices.putIfAbsent(newDevice.id(), newDevice);
if (!providerId.isAncillary()) {
availableDevices.add(newDevice.id());
}
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
// Updates the device and returns the appropriate event if necessary.
private DeviceEvent updateDevice(ProviderId providerId, DefaultDevice device,
DeviceDescription desc) {
private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
// We allow only certain attributes to trigger update
if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
!Objects.equals(device.swVersion(), desc.swVersion())) {
DefaultDevice updated = new DefaultDevice(providerId, device.id(),
desc.type(),
desc.manufacturer(),
desc.hwVersion(),
desc.swVersion(),
desc.serialNumber());
if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion())) {
synchronized (this) {
devices.put(device.id(), updated);
availableDevices.add(device.id());
devices.replace(newDevice.id(), oldDevice, newDevice);
if (!providerId.isAncillary()) {
availableDevices.add(newDevice.id());
}
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
// Otherwise merely attempt to change availability
synchronized (this) {
boolean added = availableDevices.add(device.id());
// Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) {
synchronized (this) {
boolean added = availableDevices.add(newDevice.id());
return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
}
}
return null;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
synchronized (this) {
Device device = devices.get(deviceId);
boolean removed = device != null && availableDevices.remove(deviceId);
boolean removed = (device != null) && availableDevices.remove(deviceId);
return !removed ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
}
}
@Override
public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
List<PortDescription> portDescriptions) {
// TODO: implement multi-provider
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
DeviceDescriptions descs = descsMap.get(providerId);
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
List<DeviceEvent> events = new ArrayList<>();
synchronized (this) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
Map<PortNumber, Port> ports = getPortMap(deviceId);
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
Port port = ports.get(portDescription.portNumber());
events.add(port == null ?
createPort(device, portDescription, ports) :
updatePort(device, port, portDescription, ports));
PortNumber number = portDescription.portNumber();
Port oldPort = ports.get(number);
// update description
descs.putPortDesc(number, portDescription);
Port newPort = composePort(device, number, descsMap);
events.add(oldPort == null ?
createPort(device, newPort, ports) :
updatePort(device, oldPort, newPort, ports));
processed.add(portDescription.portNumber());
}
......@@ -168,25 +219,20 @@ public class SimpleDeviceStore
// Creates a new port based on the port description adds it to the map and
// Returns corresponding event.
private DeviceEvent createPort(Device device, PortDescription portDescription,
Map<PortNumber, Port> ports) {
DefaultPort port = new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled());
ports.put(port.number(), port);
return new DeviceEvent(PORT_ADDED, device, port);
private DeviceEvent createPort(Device device, Port newPort,
ConcurrentMap<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort);
return new DeviceEvent(PORT_ADDED, device, newPort);
}
// CHecks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
private DeviceEvent updatePort(Device device, Port port,
PortDescription portDescription,
Map<PortNumber, Port> ports) {
if (port.isEnabled() != portDescription.isEnabled()) {
DefaultPort updatedPort =
new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled());
ports.put(port.number(), updatedPort);
return new DeviceEvent(PORT_UPDATED, device, updatedPort);
private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort,
ConcurrentMap<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled()) {
ports.put(oldPort.number(), newPort);
return new DeviceEvent(PORT_UPDATED, device, newPort);
}
return null;
}
......@@ -211,31 +257,48 @@ public class SimpleDeviceStore
// Gets the map of ports for the specified device; if one does not already
// exist, it creates and registers a new one.
private Map<PortNumber, Port> getPortMap(DeviceId deviceId) {
Map<PortNumber, Port> ports = devicePorts.get(deviceId);
if (ports == null) {
ports = new HashMap<>();
devicePorts.put(deviceId, ports);
}
return ports;
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId,
new InitConcurrentHashMap<PortNumber, Port>());
}
@Override
public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
DeviceDescriptions descs = descsMap.get(providerId);
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
// TODO: implement multi-provider
synchronized (this) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
Map<PortNumber, Port> ports = getPortMap(deviceId);
Port port = ports.get(portDescription.portNumber());
return updatePort(device, port, portDescription, ports);
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = portDescription.portNumber();
Port oldPort = ports.get(number);
// update description
descs.putPortDesc(number, portDescription);
Port newPort = composePort(device, number, descsMap);
if (oldPort == null) {
return createPort(device, newPort, ports);
} else {
return updatePort(device, oldPort, newPort, ports);
}
}
}
@Override
public List<Port> getPorts(DeviceId deviceId) {
Map<PortNumber, Port> ports = devicePorts.get(deviceId);
return ports == null ? new ArrayList<Port>() : ImmutableList.copyOf(ports.values());
if (ports == null) {
return Collections.emptyList();
}
return ImmutableList.copyOf(ports.values());
}
@Override
......@@ -257,4 +320,136 @@ public class SimpleDeviceStore
new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
/**
* Returns a Device, merging description given from multiple Providers.
*
* @param deviceId device identifier
* @param providerDescs Collection of Descriptions from multiple providers
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
ProviderId primary = pickPrimaryPID(providerDescs);
DeviceDescriptions desc = providerDescs.get(primary);
Type type = desc.getDeviceDesc().type();
String manufacturer = desc.getDeviceDesc().manufacturer();
String hwVersion = desc.getDeviceDesc().hwVersion();
String swVersion = desc.getDeviceDesc().swVersion();
String serialNumber = desc.getDeviceDesc().serialNumber();
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (e.getKey().equals(primary)) {
continue;
}
// FIXME: implement attribute merging once we have K-V attributes
}
return new DefaultDevice(primary, deviceId , type, manufacturer, hwVersion, swVersion, serialNumber);
}
// probably want composePorts
private Port composePort(Device device, PortNumber number,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId primary = pickPrimaryPID(providerDescs);
DeviceDescriptions primDescs = providerDescs.get(primary);
final PortDescription portDesc = primDescs.getPortDesc(number);
boolean isEnabled;
if (portDesc != null) {
isEnabled = portDesc.isEnabled();
} else {
// if no primary, assume not enabled
// TODO: revisit this port enabled/disabled behavior
isEnabled = false;
}
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (e.getKey().equals(primary)) {
continue;
}
// FIXME: implement attribute merging once we have K-V attributes
}
return new DefaultPort(device, number, isEnabled);
}
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryPID(
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
fallBackPrimary = e.getKey();
}
}
return fallBackPrimary;
}
// TODO: can be made generic
private static final class InitConcurrentHashMap<K, V> implements
ConcurrentInitializer<ConcurrentMap<K, V>> {
@Override
public ConcurrentMap<K, V> get() throws ConcurrentException {
return new ConcurrentHashMap<>();
}
}
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
private final DeviceDescription deviceDesc;
public InitDeviceDescs(DeviceDescription deviceDesc) {
this.deviceDesc = checkNotNull(deviceDesc);
}
@Override
public DeviceDescriptions get() throws ConcurrentException {
return new DeviceDescriptions(deviceDesc);
}
}
/**
* Collection of Description of a Device and it's Ports given from a Provider.
*/
private static class DeviceDescriptions {
// private final DeviceId id;
// private final ProviderId pid;
private final AtomicReference<DeviceDescription> deviceDesc;
private final ConcurrentMap<PortNumber, PortDescription> portDescs;
public DeviceDescriptions(DeviceDescription desc) {
this.deviceDesc = new AtomicReference<>(desc);
this.portDescs = new ConcurrentHashMap<>();
}
public DeviceDescription getDeviceDesc() {
return deviceDesc.get();
}
public PortDescription getPortDesc(PortNumber number) {
return portDescs.get(number);
}
public Collection<PortDescription> getPortDescs() {
return Collections.unmodifiableCollection(portDescs.values());
}
public DeviceDescription putDeviceDesc(DeviceDescription newDesc) {
return deviceDesc.getAndSet(newDesc);
}
public PortDescription putPortDesc(PortNumber number, PortDescription newDesc) {
return portDescs.put(number, newDesc);
}
}
}
......
......@@ -44,6 +44,7 @@ import com.google.common.collect.Sets;
public class SimpleDeviceStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final String MFR = "whitebox";
......@@ -89,6 +90,13 @@ public class SimpleDeviceStoreTest {
deviceStore.createOrUpdateDevice(PID, deviceId, description);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN);
deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
}
private static void assertDevice(DeviceId id, String swVersion, Device device) {
assertNotNull(device);
assertEquals(id, device.id());
......@@ -160,6 +168,33 @@ public class SimpleDeviceStoreTest {
}
@Test
public final void testCreateOrUpdateDeviceAncillary() {
DeviceDescription description =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW1, SN);
DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
assertEquals(DEVICE_ADDED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(PIDA, event.subject().providerId());
assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
DeviceDescription description2 =
new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
HW, SW2, SN);
DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
assertEquals(DEVICE_UPDATED, event2.type());
assertDevice(DID1, SW2, event2.subject());
assertEquals(PID, event2.subject().providerId());
assertTrue(deviceStore.isAvailable(DID1));
assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
// For now, Ancillary is ignored once primary appears
assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
}
@Test
public final void testMarkOffline() {
putDevice(DID1, SW1);
......@@ -257,6 +292,34 @@ public class SimpleDeviceStoreTest {
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
}
@Test
public final void testUpdatePortStatusAncillary() {
putDeviceAncillary(DID1, SW1);
putDevice(DID1, SW1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(PID, DID1, pds);
DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
new DefaultPortDescription(P1, false));
assertEquals(PORT_UPDATED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(P1, event.port().number());
assertFalse("Port is disabled", event.port().isEnabled());
DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P1, true));
assertNull("Ancillary is ignored if primary exists", event2);
DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
new DefaultPortDescription(P2, true));
assertEquals(PORT_ADDED, event3.type());
assertDevice(DID1, SW1, event3.subject());
assertEquals(P2, event3.port().number());
assertFalse("Port is disabled if not given from provider", event3.port().isEnabled());
}
@Test
......