Thomas Vachuska

Merge remote-tracking branch 'origin/master'

Showing 18 changed files with 104 additions and 123 deletions
/**
* Common abstractions and facilities for implementing distributed store
* using gossip protocol.
*/
package org.onlab.onos.store.common.impl;
......@@ -15,7 +15,7 @@ import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
/*
* Collection of Description of a Device and Ports, given from a Provider.
......
......@@ -38,7 +38,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.util.KryoPool;
......
package org.onlab.onos.store.device.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.store.common.impl.Timestamped;
// FIXME: consider removing this class
public final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
private final Timestamped<DeviceDescription> deviceDesc;
public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
this.deviceDesc = checkNotNull(deviceDesc);
}
@Override
public DeviceDescriptions get() throws ConcurrentException {
return new DeviceDescriptions(deviceDesc);
}
}
......@@ -3,7 +3,7 @@ package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import com.google.common.base.MoreObjects;
......
......@@ -3,7 +3,7 @@ package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
......
......@@ -5,7 +5,7 @@ import java.util.List;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import com.google.common.base.MoreObjects;
......
......@@ -5,7 +5,7 @@ import java.util.List;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
......
......@@ -3,7 +3,7 @@ package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import com.google.common.base.MoreObjects;
......
......@@ -3,7 +3,7 @@ package org.onlab.onos.store.device.impl;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
......
......@@ -38,7 +38,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.packet.IpPrefix;
......
package org.onlab.onos.store.common.impl;
package org.onlab.onos.store.impl;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -58,12 +58,12 @@ public final class Timestamped<T> {
}
/**
* Tests if this timestamp is newer thatn the specified timestamp.
* @param timestamp to compare agains
* Tests if this timestamp is newer than the specified timestamp.
* @param other timestamp to compare against
* @return true if this instance is newer
*/
public boolean isNewer(Timestamp timestamp) {
return this.timestamp.compareTo(checkNotNull(timestamp)) > 0;
public boolean isNewer(Timestamp other) {
return this.timestamp.compareTo(checkNotNull(other)) > 0;
}
@Override
......
......@@ -39,7 +39,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
......
......@@ -4,7 +4,7 @@ import com.google.common.base.MoreObjects;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.Timestamped;
/**
* Information published by GossipDeviceStore to notify peers of a device
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.impl.WallClockTimestamp;
import org.onlab.util.KryoPool;
......
package org.onlab.onos.store.common.impl;
package org.onlab.onos.store.impl;
import static org.junit.Assert.*;
......@@ -6,7 +6,6 @@ import java.nio.ByteBuffer;
import org.junit.Test;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import org.onlab.util.KryoPool;
import com.google.common.testing.EqualsTester;
......
......@@ -5,8 +5,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -35,6 +33,7 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
......@@ -71,8 +70,7 @@ public class SimpleDeviceStore
public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers
......@@ -117,15 +115,16 @@ public class SimpleDeviceStore
DeviceId deviceId,
DeviceDescription deviceDescription) {
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId);
Map<ProviderId, DeviceDescriptions> providerDescs
= getOrCreateDeviceDescriptions(deviceId);
synchronized (providerDescs) {
// locking per device
DeviceDescriptions descs
= createIfAbsentUnchecked(providerDescs, providerId,
new InitDeviceDescs(deviceDescription));
= getOrCreateProviderDeviceDescriptions(providerDescs,
providerId,
deviceDescription);
Device oldDevice = devices.get(deviceId);
// update description
......@@ -192,8 +191,8 @@ public class SimpleDeviceStore
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= getDeviceDescriptions(deviceId);
Map<ProviderId, DeviceDescriptions> providerDescs
= getOrCreateDeviceDescriptions(deviceId);
// locking device
synchronized (providerDescs) {
......@@ -218,7 +217,7 @@ public class SimpleDeviceStore
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
List<DeviceEvent> events = new ArrayList<>();
......@@ -287,12 +286,12 @@ public class SimpleDeviceStore
Map<PortNumber, Port> ports,
Set<PortNumber> processed) {
List<DeviceEvent> events = new ArrayList<>();
Iterator<PortNumber> iterator = ports.keySet().iterator();
Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
while (iterator.hasNext()) {
PortNumber portNumber = iterator.next();
Entry<PortNumber, Port> e = iterator.next();
PortNumber portNumber = e.getKey();
if (!processed.contains(portNumber)) {
events.add(new DeviceEvent(PORT_REMOVED, device,
ports.get(portNumber)));
events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
iterator.remove();
}
}
......@@ -306,10 +305,36 @@ public class SimpleDeviceStore
NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
}
private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptions(
DeviceId deviceId) {
return createIfAbsentUnchecked(deviceDescs, deviceId,
NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
Map<ProviderId, DeviceDescriptions> r;
r = deviceDescs.get(deviceId);
if (r != null) {
return r;
}
r = new HashMap<>();
final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
if (concurrentlyAdded != null) {
return concurrentlyAdded;
} else {
return r;
}
}
// Guarded by deviceDescs value (=Device lock)
private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
Map<ProviderId, DeviceDescriptions> device,
ProviderId providerId, DeviceDescription deltaDesc) {
synchronized (device) {
DeviceDescriptions r = device.get(providerId);
if (r == null) {
r = new DeviceDescriptions(deltaDesc);
device.put(providerId, r);
}
return r;
}
}
@Override
......@@ -318,12 +343,12 @@ public class SimpleDeviceStore
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
synchronized (descsMap) {
DeviceDescriptions descs = descsMap.get(providerId);
// assuming all providers must to give DeviceDescription
// assuming all providers must give DeviceDescription first
checkArgument(descs != null,
"Device description for Device ID %s from Provider %s was not found",
deviceId, providerId);
......@@ -367,7 +392,7 @@ public class SimpleDeviceStore
@Override
public DeviceEvent removeDevice(DeviceId deviceId) {
ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptions(deviceId);
synchronized (descs) {
Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports?
......@@ -390,7 +415,7 @@ public class SimpleDeviceStore
* @return Device instance
*/
private Device composeDevice(DeviceId deviceId,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
Map<ProviderId, DeviceDescriptions> providerDescs) {
checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
......@@ -429,14 +454,14 @@ public class SimpleDeviceStore
*
* @param device device the port is on
* @param number port number
* @param providerDescs Collection of Descriptions from multiple providers
* @param descsMap Collection of Descriptions from multiple providers
* @return Port instance
*/
private Port composePort(Device device, PortNumber number,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
Map<ProviderId, DeviceDescriptions> descsMap) {
ProviderId primary = pickPrimaryPID(providerDescs);
DeviceDescriptions primDescs = providerDescs.get(primary);
ProviderId primary = pickPrimaryPID(descsMap);
DeviceDescriptions primDescs = descsMap.get(primary);
// if no primary, assume not enabled
// TODO: revisit this default port enabled/disabled behavior
boolean isEnabled = false;
......@@ -448,7 +473,7 @@ public class SimpleDeviceStore
annotations = merge(annotations, portDesc.annotations());
}
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
if (e.getKey().equals(primary)) {
continue;
}
......@@ -470,10 +495,9 @@ public class SimpleDeviceStore
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryPID(
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
private ProviderId pickPrimaryPID(Map<ProviderId, DeviceDescriptions> descsMap) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getKey();
} else if (fallBackPrimary == null) {
......@@ -484,21 +508,6 @@ public class SimpleDeviceStore
return fallBackPrimary;
}
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
private final DeviceDescription deviceDesc;
public InitDeviceDescs(DeviceDescription deviceDesc) {
this.deviceDesc = checkNotNull(deviceDesc);
}
@Override
public DeviceDescriptions get() throws ConcurrentException {
return new DeviceDescriptions(deviceDesc);
}
}
/**
* Collection of Description of a Device and it's Ports given from a Provider.
*/
......
package org.onlab.onos.store.trivial.impl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -20,7 +18,6 @@ import org.onlab.onos.net.Link;
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.Provided;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
......@@ -28,11 +25,12 @@ import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
......@@ -47,6 +45,7 @@ import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.base.Predicates.notNull;
import static com.google.common.base.Verify.verifyNotNull;
/**
* Manages inventory of infrastructure links using trivial in-memory structures
......@@ -61,8 +60,7 @@ public class SimpleLinkStore
private final Logger log = getLogger(getClass());
// Link inventory
private final ConcurrentMap<LinkKey,
ConcurrentMap<ProviderId, LinkDescription>>
private final ConcurrentMap<LinkKey, Map<ProviderId, LinkDescription>>
linkDescs = new ConcurrentHashMap<>();
// Link instance cache
......@@ -151,7 +149,7 @@ public class SimpleLinkStore
LinkDescription linkDescription) {
LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
Map<ProviderId, LinkDescription> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
final Link oldLink = links.get(key);
// update description
......@@ -166,7 +164,7 @@ public class SimpleLinkStore
// Guarded by linkDescs value (=locking each Link)
private LinkDescription createOrUpdateLinkDescription(
ConcurrentMap<ProviderId, LinkDescription> descs,
Map<ProviderId, LinkDescription> descs,
ProviderId providerId,
LinkDescription linkDescription) {
......@@ -227,7 +225,7 @@ public class SimpleLinkStore
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
final LinkKey key = linkKey(src, dst);
ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
Map<ProviderId, LinkDescription> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
Link link = links.remove(key);
descs.clear();
......@@ -247,8 +245,8 @@ public class SimpleLinkStore
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryPID(
ConcurrentMap<ProviderId, LinkDescription> providerDescs) {
// Guarded by linkDescs value (=locking each Link)
private ProviderId getBaseProviderId(Map<ProviderId, LinkDescription> providerDescs) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, LinkDescription> e : providerDescs.entrySet()) {
......@@ -262,9 +260,10 @@ public class SimpleLinkStore
return fallBackPrimary;
}
private Link composeLink(ConcurrentMap<ProviderId, LinkDescription> descs) {
ProviderId primary = pickPrimaryPID(descs);
LinkDescription base = descs.get(primary);
// Guarded by linkDescs value (=locking each Link)
private Link composeLink(Map<ProviderId, LinkDescription> descs) {
ProviderId primary = getBaseProviderId(descs);
LinkDescription base = descs.get(verifyNotNull(primary));
ConnectPoint src = base.src();
ConnectPoint dst = base.dst();
......@@ -289,9 +288,20 @@ public class SimpleLinkStore
return new DefaultLink(primary , src, dst, type, annotations);
}
private ConcurrentMap<ProviderId, LinkDescription> getLinkDescriptions(LinkKey key) {
return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
NewConcurrentHashMap.<ProviderId, LinkDescription>ifNeeded());
private Map<ProviderId, LinkDescription> getOrCreateLinkDescriptions(LinkKey key) {
Map<ProviderId, LinkDescription> r;
r = linkDescs.get(key);
if (r != null) {
return r;
}
r = new HashMap<>();
final Map<ProviderId, LinkDescription> concurrentlyAdded;
concurrentlyAdded = linkDescs.putIfAbsent(key, r);
if (concurrentlyAdded == null) {
return r;
} else {
return concurrentlyAdded;
}
}
private final Function<LinkKey, Link> lookupLink = new LookupLink();
......@@ -302,20 +312,11 @@ public class SimpleLinkStore
private final class LookupLink implements Function<LinkKey, Link> {
@Override
public Link apply(LinkKey input) {
return links.get(input);
}
}
private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
private static final Predicate<Provided> isPrimary() {
return IS_PRIMARY;
}
private static final class IsPrimary implements Predicate<Provided> {
@Override
public boolean apply(Provided input) {
return !input.providerId().isAncillary();
if (input == null) {
return null;
} else {
return links.get(input);
}
}
}
}
......