tom

Merge remote-tracking branch 'origin/master'

Showing 39 changed files with 1272 additions and 415 deletions
......@@ -28,8 +28,10 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
</dependencies>
......
......@@ -41,5 +41,17 @@
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -4,8 +4,6 @@ import java.io.IOException;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
......@@ -13,11 +11,8 @@ import org.slf4j.LoggerFactory;
*/
public class NettyEchoHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) throws IOException {
//log.info("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
......
......@@ -8,12 +8,12 @@ import org.slf4j.LoggerFactory;
/**
* A MessageHandler that simply logs the information.
*/
public class NettyLoggingHandler implements MessageHandler {
public class NettyNothingHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) {
//log.info("Received message. Payload has {} bytes", message.payload().length);
// Do nothing
}
}
......
package org.onlab.onos.foo;
import static java.lang.Thread.sleep;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.metrics.MetricsComponent;
......@@ -15,14 +18,29 @@ import org.slf4j.LoggerFactory;
import com.codahale.metrics.Timer;
/**
* The Simple netty client test.
*/
// FIXME: Should be move out to test or app
public final class SimpleNettyClient {
private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
static NettyMessagingService messaging;
static MetricsManager metrics;
private SimpleNettyClient() {
}
/**
* The entry point of application.
*
* @param args the input arguments
* @throws IOException the iO exception
* @throws InterruptedException the interrupted exception
* @throws ExecutionException the execution exception
* @throws TimeoutException the timeout exception
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException,
TimeoutException {
......@@ -34,13 +52,20 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
System.exit(0);
}
/**
* Start standalone.
*
* @param args the args
* @throws Exception the exception
*/
public static void startStandalone(String[] args) throws Exception {
String host = args.length > 0 ? args[0] : "localhost";
int port = args.length > 1 ? Integer.parseInt(args[1]) : 8081;
int warmup = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
int iterations = args.length > 3 ? Integer.parseInt(args[3]) : 50 * 100000;
NettyMessagingService messaging = new TestNettyMessagingService(9081);
MetricsManager metrics = new MetricsManager();
messaging = new TestNettyMessagingService(9081);
metrics = new MetricsManager();
Endpoint endpoint = new Endpoint(host, port);
messaging.activate();
metrics.activate();
......@@ -53,8 +78,31 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
Response response = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
response.get(100000, TimeUnit.MILLISECONDS);
}
log.info("measuring round-trip send & receive");
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
int timeouts = 0;
for (int i = 0; i < iterations; i++) {
Response response;
Timer.Context context = sendAndReceiveTimer.time();
try {
response = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
response.get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
timeouts++;
log.info("timeout:" + timeouts + " at iteration:" + i);
} finally {
context.stop();
}
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
}
sleep(1000);
log.info("measuring async sender");
Timer sendAsyncTimer = metrics.createTimer(component, feature, "AsyncSender");
......@@ -63,20 +111,28 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
context.stop();
}
Timer sendAndReceiveTimer = metrics.createTimer(component, feature, "SendAndReceive");
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
Response response = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
// System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
sleep(1000);
}
public static void stop() {
try {
messaging.deactivate();
metrics.deactivate();
} catch (Exception e) {
log.info("Unable to stop client %s", e);
}
}
/**
* The type Test netty messaging service.
*/
public static class TestNettyMessagingService extends NettyMessagingService {
/**
* Instantiates a new Test netty messaging service.
*
* @param port the port
* @throws Exception the exception
*/
public TestNettyMessagingService(int port) throws Exception {
super(port);
}
......
package org.onlab.onos.foo;
import static org.onlab.onos.foo.SimpleNettyClient.startStandalone;
import static org.onlab.onos.foo.SimpleNettyClient.stop;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
......@@ -10,7 +11,7 @@ import org.onlab.onos.cli.AbstractShellCommand;
* Test Netty client performance.
*/
@Command(scope = "onos", name = "simple-netty-client",
description = "Starts the simple Netty client")
description = "Starts simple Netty client")
public class SimpleNettyClientCommand extends AbstractShellCommand {
//FIXME: replace these arguments with proper ones needed for the test.
......@@ -28,7 +29,7 @@ public class SimpleNettyClientCommand extends AbstractShellCommand {
@Argument(index = 3, name = "messageCount", description = "Message count",
required = false, multiValued = false)
String messageCount = "100000";
String messageCount = "1000000";
@Override
protected void execute() {
......@@ -37,5 +38,6 @@ public class SimpleNettyClientCommand extends AbstractShellCommand {
} catch (Exception e) {
error("Unable to start client %s", e);
}
stop();
}
}
......
......@@ -12,16 +12,30 @@ import org.slf4j.LoggerFactory;
private SimpleNettyServer() {}
/**
* The entry point of application.
*
* @param args the input arguments
* @throws Exception the exception
*/
public static void main(String... args) throws Exception {
startStandalone(args);
System.exit(0);
}
/**
* Start standalone server.
*
* @param args the args
* @throws Exception the exception
*/
public static void startStandalone(String[] args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8081);
int port = args.length > 0 ? Integer.parseInt(args[0]) : 8081;
NettyMessagingService server = new NettyMessagingService(port);
server.activate();
server.registerHandler("simple", new NettyLoggingHandler());
server.registerHandler("simple", new NettyNothingHandler());
server.registerHandler("echo", new NettyEchoHandler());
log.info("Netty Server server on port " + port);
}
}
......
......@@ -10,26 +10,18 @@ import org.onlab.onos.cli.AbstractShellCommand;
* Starts the Simple Netty server.
*/
@Command(scope = "onos", name = "simple-netty-server",
description = "Starts the simple netty server")
description = "Starts simple Netty server")
public class SimpleNettyServerCommand extends AbstractShellCommand {
//FIXME: Replace these with parameters for
@Argument(index = 0, name = "serverIp", description = "Server IP address",
@Argument(index = 0, name = "port", description = "listen port",
required = false, multiValued = false)
String serverIp = "127.0.0.1";
@Argument(index = 1, name = "workers", description = "IO workers",
required = false, multiValued = false)
String workers = "6";
@Argument(index = 2, name = "messageLength", description = "Message length (bytes)",
required = false, multiValued = false)
String messageLength = "128";
String port = "8081";
@Override
protected void execute() {
try {
startStandalone(new String[]{serverIp, workers, messageLength});
startStandalone(new String[]{port});
} catch (Exception e) {
error("Unable to start server %s", e);
}
......
......@@ -16,4 +16,14 @@
<description>ONOS simple Mobility app</description>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
</dependencies>
</project>
......
......@@ -24,6 +24,7 @@
<module>mobility</module>
<module>proxyarp</module>
<module>config</module>
<module>sdnip</module>
</modules>
<properties>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-sdnip</artifactId>
<packaging>bundle</packaging>
<description>SDN-IP peering application</description>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
package org.onlab.onos.sdnip;
import static org.slf4j.LoggerFactory.getLogger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.slf4j.Logger;
/**
* Placeholder SDN-IP component.
*/
@Component(immediate = true)
public class SdnIp {
private final Logger log = getLogger(getClass());
@Activate
protected void activate() {
log.debug("SDN-IP started");
}
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
}
/**
* SDN-IP peering application.
*/
package org.onlab.onos.sdnip;
\ No newline at end of file
package org.onlab.onos.store.common.impl;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
package org.onlab.onos.cluster;
import com.google.common.base.Function;
......@@ -18,6 +15,11 @@ public final class ControllerNodeToNodeId
return input.id();
}
/**
* Returns a Function to convert ControllerNode to NodeId.
*
* @return ControllerNodeToNodeId instance.
*/
public static ControllerNodeToNodeId toNodeId() {
return INSTANCE;
}
......
package org.onlab.onos.cluster;
import static org.junit.Assert.*;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.FluentIterable;
public class ControllerNodeToNodeIdTest {
private static final NodeId NID1 = new NodeId("foo");
private static final NodeId NID2 = new NodeId("bar");
private static final NodeId NID3 = new NodeId("buz");
private static final IpPrefix IP1 = IpPrefix.valueOf("127.0.0.1");
private static final IpPrefix IP2 = IpPrefix.valueOf("127.0.0.2");
private static final IpPrefix IP3 = IpPrefix.valueOf("127.0.0.3");
private static final ControllerNode CN1 = new DefaultControllerNode(NID1, IP1);
private static final ControllerNode CN2 = new DefaultControllerNode(NID2, IP2);
private static final ControllerNode CN3 = new DefaultControllerNode(NID3, IP3);
@Test
public final void testToNodeId() {
final Iterable<ControllerNode> nodes = Arrays.asList(CN1, CN2, CN3);
final List<NodeId> nodeIds = Arrays.asList(NID1, NID2, NID3);
assertEquals(nodeIds,
FluentIterable.from(nodes)
.transform(toNodeId())
.toList());
}
}
......@@ -9,6 +9,7 @@ import static org.onlab.onos.event.TestEvent.Type.FOO;
import java.util.List;
import java.util.Timer;
import org.junit.Ignore;
import org.junit.Test;
/**
......@@ -41,6 +42,7 @@ public class AbstractEventAccumulatorTest {
assertEquals("incorrect batch", "abcde", accumulator.batch);
}
@Ignore("FIXME: timing sensitive test failing randomly.")
@Test
public void timeTrigger() {
TestAccumulator accumulator = new TestAccumulator();
......
......@@ -54,8 +54,13 @@
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
......
......@@ -65,6 +65,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
......@@ -73,7 +74,6 @@ import static com.google.common.base.Verify.verify;
import static org.onlab.util.Tools.namedThreads;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE;
import static org.onlab.onos.store.common.impl.ControllerNodeToNodeId.toNodeId;
// TODO: give me a better name
/**
......
package org.onlab.onos.store.link.impl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.Provided;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.DefaultAnnotations.union;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.base.Predicates.notNull;
/**
* Manages inventory of infrastructure links in distributed data store
* that uses optimistic replication and gossip based techniques.
*/
@Component(immediate = true)
@Service
public class GossipLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
// Link inventory
private final ConcurrentMap<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>> linkDescs =
new ConcurrentHashMap<>();
// Link instance cache
private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
// Egress and ingress link sets
private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
// Remove links
private final Map<LinkKey, Timestamp> removedLinks = Maps.newHashMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoPool.newBuilder()
.register(DistributedStoreSerializers.COMMON)
.register(InternalLinkEvent.class)
.register(InternalLinkRemovedEvent.class)
.register(LinkAntiEntropyAdvertisement.class)
.register(LinkFragmentId.class)
.build()
.populate(1);
}
};
private ScheduledExecutorService executor;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_UPDATE,
new InternalLinkEventListener());
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_REMOVED,
new InternalLinkRemovedEventListener());
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
executor =
newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
executor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
linkDescs.clear();
links.clear();
srcLinks.clear();
dstLinks.clear();
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
return Collections.unmodifiableCollection(links.values());
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
// lock for iteration
synchronized (srcLinks) {
return FluentIterable.from(srcLinks.get(deviceId))
.transform(lookupLink())
.filter(notNull())
.toSet();
}
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
// lock for iteration
synchronized (dstLinks) {
return FluentIterable.from(dstLinks.get(deviceId))
.transform(lookupLink())
.filter(notNull())
.toSet();
}
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
return links.get(new LinkKey(src, dst));
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egress = new HashSet<>();
for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
if (linkKey.src().equals(src)) {
egress.add(links.get(linkKey));
}
}
return egress;
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingress = new HashSet<>();
for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
if (linkKey.dst().equals(dst)) {
ingress.add(links.get(linkKey));
}
}
return ingress;
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
DeviceId dstDeviceId = linkDescription.dst().deviceId();
Timestamp newTimestamp = clockService.getTimestamp(dstDeviceId);
final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
LinkEvent event = createOrUpdateLinkInternal(providerId, deltaDesc);
if (event != null) {
log.info("Notifying peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
providerId, linkDescription.src(), linkDescription.dst());
try {
notifyPeers(new InternalLinkEvent(providerId, deltaDesc));
} catch (IOException e) {
log.info("Failed to notify peers of a link update topology event from providerId: "
+ "{} between src: {} and dst: {}",
providerId, linkDescription.src(), linkDescription.dst());
}
}
return event;
}
private LinkEvent createOrUpdateLinkInternal(
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
LinkKey key = new LinkKey(linkDescription.value().src(), linkDescription.value().dst());
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
synchronized (descs) {
// if the link was previously removed, we should proceed if and
// only if this request is more recent.
Timestamp linkRemovedTimestamp = removedLinks.get(key);
if (linkRemovedTimestamp != null) {
if (linkDescription.isNewer(linkRemovedTimestamp)) {
removedLinks.remove(key);
} else {
return null;
}
}
final Link oldLink = links.get(key);
// update description
createOrUpdateLinkDescription(descs, providerId, linkDescription);
final Link newLink = composeLink(descs);
if (oldLink == null) {
return createLink(key, newLink);
}
return updateLink(key, oldLink, newLink);
}
}
// Guarded by linkDescs value (=locking each Link)
private Timestamped<LinkDescription> createOrUpdateLinkDescription(
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> existingLinkDescriptions,
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
// merge existing attributes and merge
Timestamped<LinkDescription> existingLinkDescription = existingLinkDescriptions.get(providerId);
if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
return null;
}
Timestamped<LinkDescription> newLinkDescription = linkDescription;
if (existingLinkDescription != null) {
SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
linkDescription.value().annotations());
newLinkDescription = new Timestamped<LinkDescription>(
new DefaultLinkDescription(
linkDescription.value().src(),
linkDescription.value().dst(),
linkDescription.value().type(), merged),
linkDescription.timestamp());
}
return existingLinkDescriptions.put(providerId, newLinkDescription);
}
// Creates and stores the link and returns the appropriate event.
// Guarded by linkDescs value (=locking each Link)
private LinkEvent createLink(LinkKey key, Link newLink) {
if (newLink.providerId().isAncillary()) {
// TODO: revisit ancillary only Link handling
// currently treating ancillary only as down (not visible outside)
return null;
}
links.put(key, newLink);
srcLinks.put(newLink.src().deviceId(), key);
dstLinks.put(newLink.dst().deviceId(), key);
return new LinkEvent(LINK_ADDED, newLink);
}
// Updates, if necessary the specified link and returns the appropriate event.
// Guarded by linkDescs value (=locking each Link)
private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
if (newLink.providerId().isAncillary()) {
// TODO: revisit ancillary only Link handling
// currently treating ancillary only as down (not visible outside)
return null;
}
if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
// strictly speaking following can be ommitted
srcLinks.put(oldLink.src().deviceId(), key);
dstLinks.put(oldLink.dst().deviceId(), key);
return new LinkEvent(LINK_UPDATED, newLink);
}
return null;
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
final LinkKey key = new LinkKey(src, dst);
DeviceId dstDeviceId = dst.deviceId();
Timestamp timestamp = clockService.getTimestamp(dstDeviceId);
LinkEvent event = removeLinkInternal(key, timestamp);
if (event != null) {
log.info("Notifying peers of a link removed topology event for a link "
+ "between src: {} and dst: {}", src, dst);
try {
notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
} catch (IOException e) {
log.error("Failed to notify peers of a link removed topology event for a link "
+ "between src: {} and dst: {}", src, dst);
}
}
return event;
}
private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
getLinkDescriptions(key);
synchronized (linkDescriptions) {
// accept removal request if given timestamp is newer than
// the latest Timestamp from Primary provider
ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
return null;
}
removedLinks.put(key, timestamp);
Link link = links.remove(key);
linkDescriptions.clear();
if (link != null) {
srcLinks.remove(link.src().deviceId(), key);
dstLinks.remove(link.dst().deviceId(), key);
return new LinkEvent(LINK_REMOVED, link);
}
return null;
}
}
private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
return synchronizedSetMultimap(HashMultimap.<K, V>create());
}
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryProviderId(
ConcurrentMap<ProviderId, Timestamped<LinkDescription>> providerDescs) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, Timestamped<LinkDescription>> e : providerDescs.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
fallBackPrimary = e.getKey();
}
}
return fallBackPrimary;
}
private Link composeLink(ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
Timestamped<LinkDescription> base = linkDescriptions.get(primaryProviderId);
ConnectPoint src = base.value().src();
ConnectPoint dst = base.value().dst();
Type type = base.value().type();
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
annotations = merge(annotations, base.value().annotations());
for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
if (primaryProviderId.equals(e.getKey())) {
continue;
}
// TODO: should keep track of Description timestamp
// and only merge conflicting keys when timestamp is newer
// Currently assuming there will never be a key conflict between
// providers
// annotation merging. not so efficient, should revisit later
annotations = merge(annotations, e.getValue().value().annotations());
}
return new DefaultLink(primaryProviderId , src, dst, type, annotations);
}
private ConcurrentMap<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
}
private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
return getLinkDescriptions(key).get(providerId);
}
private final Function<LinkKey, Link> lookupLink = new LookupLink();
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
}
private final class LookupLink implements Function<LinkKey, Link> {
@Override
public Link apply(LinkKey input) {
return links.get(input);
}
}
private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
private static final Predicate<Provided> isPrimary() {
return IS_PRIMARY;
}
private static final class IsPrimary implements Predicate<Provided> {
@Override
public boolean apply(Provided input) {
return !input.providerId().isAncillary();
}
}
private void notifyDelegateIfNotNull(LinkEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
// TODO: should we be throwing exception?
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.broadcast(message);
}
// TODO: should we be throwing exception?
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
try {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, recipient);
} catch (IOException e) {
log.error("Failed to send a {} message to {}", subject.value(), recipient);
}
}
private void notifyPeers(InternalLinkEvent event) throws IOException {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
private void notifyPeers(InternalLinkRemovedEvent event) throws IOException {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
private void notifyPeer(NodeId peer, InternalLinkEvent event) {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
.transform(toNodeId())
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.info("No other peers in the cluster.");
return;
}
NodeId peer;
do {
int idx = RandomUtils.nextInt(0, nodeIds.size());
peer = nodeIds.get(idx);
} while (peer.equals(self));
LinkAntiEntropyAdvertisement ad = createAdvertisement();
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (Exception e) {
log.error("Failed to send anti-entropy advertisement", e);
return;
}
} catch (Exception e) {
// catch all Exception to avoid Scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
}
private LinkAntiEntropyAdvertisement createAdvertisement() {
final NodeId self = clusterService.getLocalNode().id();
Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
for (Entry<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>>
provs : linkDescs.entrySet()) {
final LinkKey linkKey = provs.getKey();
final ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDesc = provs.getValue();
synchronized (linkDesc) {
for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
}
}
}
linkTombstones.putAll(removedLinks);
return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
}
private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
NodeId peer = advertisement.sender();
Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
LinkFragmentId linkFragmentId = entry.getKey();
Timestamp peerTimestamp = entry.getValue();
LinkKey key = linkFragmentId.linkKey();
ProviderId providerId = linkFragmentId.providerId();
Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
if (linkDescription.isNewer(peerTimestamp)) {
// I have more recent link description. update peer.
notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
}
// else TODO: Peer has more recent link description. request it.
Timestamp linkRemovedTimestamp = removedLinks.get(key);
if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
// peer has a zombie link. update peer.
notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
}
}
for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
LinkKey key = entry.getKey();
Timestamp peerTimestamp = entry.getValue();
ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
if (primaryProviderId != null) {
if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
}
}
}
}
private class InternalLinkEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received link event from peer: {}", message.sender());
InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
ProviderId providerId = event.providerId();
Timestamped<LinkDescription> linkDescription = event.linkDescription();
notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
}
}
private class InternalLinkRemovedEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received link removed event from peer: {}", message.sender());
InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
LinkKey linkKey = event.linkKey();
Timestamp timestamp = event.timestamp();
notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
}
}
private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
handleAntiEntropyAdvertisement(advertisement);
}
}
}
package org.onlab.onos.store.link.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
/**
* MessageSubjects used by GossipLinkStore peer-peer communication.
*/
public final class GossipLinkStoreMessageSubjects {
private GossipLinkStoreMessageSubjects() {}
public static final MessageSubject LINK_UPDATE =
new MessageSubject("peer-link-update");
public static final MessageSubject LINK_REMOVED =
new MessageSubject("peer-link-removed");
public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
new MessageSubject("link-enti-entropy-advertisement");
}
package org.onlab.onos.store.link.impl;
import com.google.common.base.MoreObjects;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.impl.Timestamped;
/**
* Information published by GossipDeviceStore to notify peers of a device
* change event.
*/
public class InternalLinkEvent {
private final ProviderId providerId;
private final Timestamped<LinkDescription> linkDescription;
protected InternalLinkEvent(
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
this.providerId = providerId;
this.linkDescription = linkDescription;
}
public ProviderId providerId() {
return providerId;
}
public Timestamped<LinkDescription> linkDescription() {
return linkDescription;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("linkDescription", linkDescription)
.toString();
}
// for serializer
protected InternalLinkEvent() {
this.providerId = null;
this.linkDescription = null;
}
}
package org.onlab.onos.store.link.impl;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.store.Timestamp;
import com.google.common.base.MoreObjects;
/**
* Information published by GossipLinkStore to notify peers of a link
* being removed.
*/
public class InternalLinkRemovedEvent {
private final LinkKey linkKey;
private final Timestamp timestamp;
/**
* Creates a InternalLinkRemovedEvent.
* @param linkKey identifier of the removed link.
* @param timestamp timestamp of when the link was removed.
*/
public InternalLinkRemovedEvent(LinkKey linkKey, Timestamp timestamp) {
this.linkKey = linkKey;
this.timestamp = timestamp;
}
public LinkKey linkKey() {
return linkKey;
}
public Timestamp timestamp() {
return timestamp;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("linkKey", linkKey)
.add("timestamp", timestamp)
.toString();
}
// for serializer
@SuppressWarnings("unused")
private InternalLinkRemovedEvent() {
linkKey = null;
timestamp = null;
}
}
\ No newline at end of file
package org.onlab.onos.store.link.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.store.Timestamp;
/**
* Link AE Advertisement message.
*/
public class LinkAntiEntropyAdvertisement {
private final NodeId sender;
private final Map<LinkFragmentId, Timestamp> linkTimestamps;
private final Map<LinkKey, Timestamp> linkTombstones;
public LinkAntiEntropyAdvertisement(NodeId sender,
Map<LinkFragmentId, Timestamp> linkTimestamps,
Map<LinkKey, Timestamp> linkTombstones) {
this.sender = checkNotNull(sender);
this.linkTimestamps = checkNotNull(linkTimestamps);
this.linkTombstones = checkNotNull(linkTombstones);
}
public NodeId sender() {
return sender;
}
public Map<LinkFragmentId, Timestamp> linkTimestamps() {
return linkTimestamps;
}
public Map<LinkKey, Timestamp> linkTombstones() {
return linkTombstones;
}
// For serializer
@SuppressWarnings("unused")
private LinkAntiEntropyAdvertisement() {
this.sender = null;
this.linkTimestamps = null;
this.linkTombstones = null;
}
}
package org.onlab.onos.store.link.impl;
import java.util.Objects;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.provider.ProviderId;
import com.google.common.base.MoreObjects;
/**
* Identifier for LinkDescription from a Provider.
*/
public final class LinkFragmentId {
public final ProviderId providerId;
public final LinkKey linkKey;
public LinkFragmentId(LinkKey linkKey, ProviderId providerId) {
this.providerId = providerId;
this.linkKey = linkKey;
}
public LinkKey linkKey() {
return linkKey;
}
public ProviderId providerId() {
return providerId;
}
@Override
public int hashCode() {
return Objects.hash(providerId, linkKey);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LinkFragmentId)) {
return false;
}
LinkFragmentId that = (LinkFragmentId) obj;
return Objects.equals(this.linkKey, that.linkKey) &&
Objects.equals(this.providerId, that.providerId);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("linkKey", linkKey)
.toString();
}
// for serializer
@SuppressWarnings("unused")
private LinkFragmentId() {
this.providerId = null;
this.linkKey = null;
}
}
package org.onlab.onos.store.link.impl;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.ImmutableSet.Builder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
//TODO: Add support for multiple provider and annotations
/**
* Manages inventory of infrastructure links using a protocol that takes into consideration
* the order in which events occur.
*/
// FIXME: This does not yet implement the full protocol.
// The full protocol requires the sender of LLDP message to include the
// version information of src device/port and the receiver to
// take that into account when figuring out if a more recent src
// device/port down event renders the link discovery obsolete.
@Component(immediate = true)
@Service
public class OnosDistributedLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
// Link inventory
private ConcurrentMap<LinkKey, VersionedValue<Link>> links;
public static final String LINK_NOT_FOUND = "Link between %s and %s not found";
// TODO synchronize?
// Egress and ingress link sets
private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Activate
public void activate() {
links = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
Builder<Link> builder = ImmutableSet.builder();
synchronized (this) {
for (VersionedValue<Link> link : links.values()) {
builder.add(link.entity());
}
return builder.build();
}
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId));
Set<Link> rawEgressLinks = new HashSet<>();
for (VersionedValue<Link> link : egressLinks) {
rawEgressLinks.add(link.entity());
}
return rawEgressLinks;
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId));
Set<Link> rawIngressLinks = new HashSet<>();
for (VersionedValue<Link> link : ingressLinks) {
rawIngressLinks.add(link.entity());
}
return rawIngressLinks;
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
VersionedValue<Link> link = links.get(new LinkKey(src, dst));
checkArgument(link != null, "LINK_NOT_FOUND", src, dst);
return link.entity();
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egressLinks = new HashSet<>();
for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) {
if (link.entity().src().equals(src)) {
egressLinks.add(link.entity());
}
}
return egressLinks;
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingressLinks = new HashSet<>();
for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) {
if (link.entity().dst().equals(dst)) {
ingressLinks.add(link.entity());
}
}
return ingressLinks;
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
final DeviceId destinationDeviceId = linkDescription.dst().deviceId();
final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId);
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
VersionedValue<Link> link = links.get(key);
if (link == null) {
return createLink(providerId, key, linkDescription, newTimestamp);
}
checkState(newTimestamp.compareTo(link.timestamp()) > 0,
"Existing Link has a timestamp in the future!");
return updateLink(providerId, link, key, linkDescription, newTimestamp);
}
// Creates and stores the link and returns the appropriate event.
private LinkEvent createLink(ProviderId providerId, LinkKey key,
LinkDescription linkDescription, Timestamp timestamp) {
VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(),
linkDescription.type()), true, timestamp);
synchronized (this) {
links.put(key, link);
addNewLink(link, timestamp);
}
// FIXME: notify peers.
return new LinkEvent(LINK_ADDED, link.entity());
}
// update Egress and ingress link sets
private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) {
Link rawLink = link.entity();
synchronized (this) {
srcLinks.put(rawLink.src().deviceId(), link);
dstLinks.put(rawLink.dst().deviceId(), link);
}
}
// Updates, if necessary the specified link and returns the appropriate event.
private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink,
LinkKey key, LinkDescription linkDescription, Timestamp timestamp) {
// FIXME confirm Link update condition is OK
if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) {
synchronized (this) {
VersionedValue<Link> updatedLink = new VersionedValue<Link>(
new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(),
linkDescription.type()), true, timestamp);
links.replace(key, existingLink, updatedLink);
replaceLink(existingLink, updatedLink);
// FIXME: notify peers.
return new LinkEvent(LINK_UPDATED, updatedLink.entity());
}
}
return null;
}
// update Egress and ingress link sets
private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) {
synchronized (this) {
srcLinks.remove(current.entity().src().deviceId(), current);
dstLinks.remove(current.entity().dst().deviceId(), current);
srcLinks.put(current.entity().src().deviceId(), updated);
dstLinks.put(current.entity().dst().deviceId(), updated);
}
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
synchronized (this) {
LinkKey key = new LinkKey(src, dst);
VersionedValue<Link> link = links.remove(key);
if (link != null) {
removeLink(link);
// notify peers
return new LinkEvent(LINK_REMOVED, link.entity());
}
return null;
}
}
// update Egress and ingress link sets
private void removeLink(VersionedValue<Link> link) {
synchronized (this) {
srcLinks.remove(link.entity().src().deviceId(), link);
dstLinks.remove(link.entity().dst().deviceId(), link);
}
}
}
package org.onlab.onos.store.link.impl;
import java.util.Objects;
import org.onlab.onos.store.Timestamp;
// TODO: remove once we stop using this
/**
* Wrapper class for a entity that is versioned
* and can either be up or down.
*
* @param <T> type of the value.
*/
public class VersionedValue<T> {
private final T entity;
private final Timestamp timestamp;
private final boolean isUp;
public VersionedValue(T entity, boolean isUp, Timestamp timestamp) {
this.entity = entity;
this.isUp = isUp;
this.timestamp = timestamp;
}
/**
* Returns the value.
* @return value.
*/
public T entity() {
return entity;
}
/**
* Tells whether the entity is up or down.
* @return true if up, false otherwise.
*/
public boolean isUp() {
return isUp;
}
/**
* Returns the timestamp (version) associated with this entity.
* @return timestamp.
*/
public Timestamp timestamp() {
return timestamp;
}
@Override
public int hashCode() {
return Objects.hash(entity, timestamp, isUp);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
@SuppressWarnings("unchecked")
VersionedValue<T> that = (VersionedValue<T>) obj;
return Objects.equals(this.entity, that.entity) &&
Objects.equals(this.timestamp, that.timestamp) &&
Objects.equals(this.isUp, that.isUp);
}
// Default constructor for serializer
protected VersionedValue() {
this.entity = null;
this.isUp = false;
this.timestamp = null;
}
}
package org.onlab.onos.store.device.impl.peermsg;
import static org.onlab.onos.net.DeviceId.deviceId;
import org.junit.Test;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.provider.ProviderId;
import com.google.common.testing.EqualsTester;
public class DeviceFragmentIdTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
@Test
public final void testEquals() {
new EqualsTester()
.addEqualityGroup(new DeviceFragmentId(DID1, PID),
new DeviceFragmentId(DID1, PID))
.addEqualityGroup(new DeviceFragmentId(DID2, PID),
new DeviceFragmentId(DID2, PID))
.addEqualityGroup(new DeviceFragmentId(DID1, PIDA),
new DeviceFragmentId(DID1, PIDA))
.addEqualityGroup(new DeviceFragmentId(DID2, PIDA),
new DeviceFragmentId(DID2, PIDA))
.testEquals();
}
}
package org.onlab.onos.store.device.impl.peermsg;
import static org.onlab.onos.net.DeviceId.deviceId;
import org.junit.Test;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import com.google.common.testing.EqualsTester;
public class PortFragmentIdTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final PortNumber PN1 = PortNumber.portNumber(1);
private static final PortNumber PN2 = PortNumber.portNumber(2);
@Test
public final void testEquals() {
new EqualsTester()
.addEqualityGroup(new PortFragmentId(DID1, PID, PN1),
new PortFragmentId(DID1, PID, PN1))
.addEqualityGroup(new PortFragmentId(DID2, PID, PN1),
new PortFragmentId(DID2, PID, PN1))
.addEqualityGroup(new PortFragmentId(DID1, PIDA, PN1),
new PortFragmentId(DID1, PIDA, PN1))
.addEqualityGroup(new PortFragmentId(DID2, PIDA, PN1),
new PortFragmentId(DID2, PIDA, PN1))
.addEqualityGroup(new PortFragmentId(DID1, PID, PN2),
new PortFragmentId(DID1, PID, PN2))
.addEqualityGroup(new PortFragmentId(DID2, PID, PN2),
new PortFragmentId(DID2, PID, PN2))
.addEqualityGroup(new PortFragmentId(DID1, PIDA, PN2),
new PortFragmentId(DID1, PIDA, PN2))
.addEqualityGroup(new PortFragmentId(DID2, PIDA, PN2),
new PortFragmentId(DID2, PIDA, PN2))
.testEquals();
}
}
......@@ -46,10 +46,6 @@
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -35,8 +35,8 @@
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
......
......@@ -23,11 +23,6 @@
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-core-hz-common</artifactId>
<version>${project.version}</version>
</dependency>
......@@ -46,10 +41,6 @@
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -26,8 +26,13 @@
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
......
......@@ -24,14 +24,13 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.Timestamp;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import de.javakaffee.kryoserializers.URISerializer;
public final class KryoPoolUtil {
/**
......@@ -60,6 +59,7 @@ public final class KryoPoolUtil {
DefaultControllerNode.class,
DefaultDevice.class,
DefaultDeviceDescription.class,
DefaultLinkDescription.class,
MastershipRole.class,
Port.class,
DefaultPortDescription.class,
......
package org.onlab.onos.store.serializers;
import java.net.URI;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
/**
* Serializer for {@link URI}.
*/
public class URISerializer extends Serializer<URI> {
/**
* Creates {@link URI} serializer instance.
*/
public URISerializer() {
super(false);
}
@Override
public void write(Kryo kryo, Output output, URI object) {
output.writeString(object.toString());
}
@Override
public URI read(Kryo kryo, Input input, Class<URI> type) {
return URI.create(input.readString());
}
}
......@@ -20,10 +20,11 @@
<bundle>mvn:io.dropwizard.metrics/metrics-core/3.1.0</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
<bundle>mvn:com.esotericsoftware.kryo/kryo/2.24.0</bundle>
<bundle>mvn:com.esotericsoftware/kryo/3.0.0</bundle>
<bundle>mvn:com.esotericsoftware/reflectasm/1.10.0</bundle>
<bundle>mvn:org.ow2.asm/asm/4.2</bundle>
<bundle>mvn:com.esotericsoftware/minlog/1.3.0</bundle>
<bundle>mvn:org.objenesis/objenesis/2.1</bundle>
<bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle>
<bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle>
......
......@@ -193,9 +193,20 @@
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>2.24.0</version>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>reflectasm</artifactId>
<version>1.10.0</version>
<type>bundle</type>
</dependency>
<dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
......@@ -207,11 +218,6 @@
<artifactId>objenesis</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.27</version>
</dependency>
<!-- ONOS related -->
<dependency>
......@@ -284,6 +290,10 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</dependency>
</dependencies>
......@@ -434,9 +444,6 @@
<version>3.2</version>
<configuration>
<excludes>
<exclude>**/datastore/serializers/**</exclude>
<exclude>**/edu/stanford/**</exclude>
<exclude>**/net/floodlightcontroller/**</exclude>
</excludes>
<rulesets>
<ruleset>onos/pmd.xml</ruleset>
......@@ -545,9 +552,6 @@
<version>3.2</version>
<configuration>
<excludes>
<exclude>**/datastore/serializers/**</exclude>
<exclude>**/edu/stanford/**</exclude>
<exclude>**/net/floodlightcontroller/**</exclude>
</excludes>
<rulesets>
<ruleset>onos/pmd.xml</ruleset>
......
#!/bin/bash
#------------------------------------------------------------------------------
# Echoes project-level directory if a Java file within is newer than the
# target directory.
......
......@@ -44,16 +44,8 @@
<artifactId>minimal-json</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>minlog</artifactId>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
......
......@@ -32,10 +32,6 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
......