Yuta HIGUCHI

SimpleLinkStore with annotation and multi-provider support

Change-Id: I98a35c4497363c6115fd0c61d140dfe7790e6cee
1 +package org.onlab.onos.net;
2 +
3 +public final class AnnotationsUtil {
4 +
5 + public static boolean isEqual(Annotations lhs, Annotations rhs) {
6 + if (lhs == rhs) {
7 + return true;
8 + }
9 + if (lhs == null || rhs == null) {
10 + return false;
11 + }
12 +
13 + if (!lhs.keys().equals(rhs.keys())) {
14 + return false;
15 + }
16 +
17 + for (String key : lhs.keys()) {
18 + if (!lhs.value(key).equals(rhs.value(key))) {
19 + return false;
20 + }
21 + }
22 + return true;
23 + }
24 +
25 + // not to be instantiated
26 + private AnnotationsUtil() {}
27 +}
1 package org.onlab.onos.net.link; 1 package org.onlab.onos.net.link;
2 2
3 import org.onlab.onos.net.ConnectPoint; 3 import org.onlab.onos.net.ConnectPoint;
4 +import org.onlab.onos.net.Description;
4 import org.onlab.onos.net.Link; 5 import org.onlab.onos.net.Link;
5 6
6 /** 7 /**
7 * Describes an infrastructure link. 8 * Describes an infrastructure link.
8 */ 9 */
9 -public interface LinkDescription { 10 +public interface LinkDescription extends Description {
10 11
11 /** 12 /**
12 * Returns the link source. 13 * Returns the link source.
......
...@@ -11,7 +11,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -11,7 +11,7 @@ import org.apache.felix.scr.annotations.Deactivate;
11 import org.apache.felix.scr.annotations.Reference; 11 import org.apache.felix.scr.annotations.Reference;
12 import org.apache.felix.scr.annotations.ReferenceCardinality; 12 import org.apache.felix.scr.annotations.ReferenceCardinality;
13 import org.apache.felix.scr.annotations.Service; 13 import org.apache.felix.scr.annotations.Service;
14 -import org.onlab.onos.net.Annotations; 14 +import org.onlab.onos.net.AnnotationsUtil;
15 import org.onlab.onos.net.DefaultAnnotations; 15 import org.onlab.onos.net.DefaultAnnotations;
16 import org.onlab.onos.net.DefaultDevice; 16 import org.onlab.onos.net.DefaultDevice;
17 import org.onlab.onos.net.DefaultPort; 17 import org.onlab.onos.net.DefaultPort;
...@@ -196,7 +196,7 @@ public class GossipDeviceStore ...@@ -196,7 +196,7 @@ public class GossipDeviceStore
196 // We allow only certain attributes to trigger update 196 // We allow only certain attributes to trigger update
197 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || 197 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
198 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || 198 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
199 - !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) { 199 + !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
200 200
201 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice); 201 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
202 if (!replaced) { 202 if (!replaced) {
...@@ -327,7 +327,7 @@ public class GossipDeviceStore ...@@ -327,7 +327,7 @@ public class GossipDeviceStore
327 Port newPort, 327 Port newPort,
328 Map<PortNumber, Port> ports) { 328 Map<PortNumber, Port> ports) {
329 if (oldPort.isEnabled() != newPort.isEnabled() || 329 if (oldPort.isEnabled() != newPort.isEnabled() ||
330 - !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) { 330 + !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
331 331
332 ports.put(oldPort.number(), newPort); 332 ports.put(oldPort.number(), newPort);
333 return new DeviceEvent(PORT_UPDATED, device, newPort); 333 return new DeviceEvent(PORT_UPDATED, device, newPort);
...@@ -438,32 +438,11 @@ public class GossipDeviceStore ...@@ -438,32 +438,11 @@ public class GossipDeviceStore
438 438
439 @Override 439 @Override
440 public DeviceEvent removeDevice(DeviceId deviceId) { 440 public DeviceEvent removeDevice(DeviceId deviceId) {
441 - synchronized (this) {
442 Device device = devices.remove(deviceId); 441 Device device = devices.remove(deviceId);
442 + // FIXME: should we be removing deviceDescs also?
443 return device == null ? null : 443 return device == null ? null :
444 new DeviceEvent(DEVICE_REMOVED, device, null); 444 new DeviceEvent(DEVICE_REMOVED, device, null);
445 } 445 }
446 - }
447 -
448 - private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
449 - if (lhs == rhs) {
450 - return true;
451 - }
452 - if (lhs == null || rhs == null) {
453 - return false;
454 - }
455 -
456 - if (!lhs.keys().equals(rhs.keys())) {
457 - return false;
458 - }
459 -
460 - for (String key : lhs.keys()) {
461 - if (!lhs.value(key).equals(rhs.value(key))) {
462 - return false;
463 - }
464 - }
465 - return true;
466 - }
467 446
468 /** 447 /**
469 * Returns a Device, merging description given from multiple Providers. 448 * Returns a Device, merging description given from multiple Providers.
......
...@@ -9,7 +9,7 @@ import org.apache.felix.scr.annotations.Activate; ...@@ -9,7 +9,7 @@ import org.apache.felix.scr.annotations.Activate;
9 import org.apache.felix.scr.annotations.Component; 9 import org.apache.felix.scr.annotations.Component;
10 import org.apache.felix.scr.annotations.Deactivate; 10 import org.apache.felix.scr.annotations.Deactivate;
11 import org.apache.felix.scr.annotations.Service; 11 import org.apache.felix.scr.annotations.Service;
12 -import org.onlab.onos.net.Annotations; 12 +import org.onlab.onos.net.AnnotationsUtil;
13 import org.onlab.onos.net.DefaultAnnotations; 13 import org.onlab.onos.net.DefaultAnnotations;
14 import org.onlab.onos.net.DefaultDevice; 14 import org.onlab.onos.net.DefaultDevice;
15 import org.onlab.onos.net.DefaultPort; 15 import org.onlab.onos.net.DefaultPort;
...@@ -28,6 +28,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate; ...@@ -28,6 +28,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
28 import org.onlab.onos.net.device.PortDescription; 28 import org.onlab.onos.net.device.PortDescription;
29 import org.onlab.onos.net.provider.ProviderId; 29 import org.onlab.onos.net.provider.ProviderId;
30 import org.onlab.onos.store.AbstractStore; 30 import org.onlab.onos.store.AbstractStore;
31 +import org.onlab.util.NewConcurrentHashMap;
31 import org.slf4j.Logger; 32 import org.slf4j.Logger;
32 33
33 import java.util.ArrayList; 34 import java.util.ArrayList;
...@@ -109,8 +110,7 @@ public class SimpleDeviceStore ...@@ -109,8 +110,7 @@ public class SimpleDeviceStore
109 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId, 110 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
110 DeviceDescription deviceDescription) { 111 DeviceDescription deviceDescription) {
111 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs 112 ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
112 - = createIfAbsentUnchecked(deviceDescs, deviceId, 113 + = getDeviceDescriptions(deviceId);
113 - new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
114 114
115 Device oldDevice = devices.get(deviceId); 115 Device oldDevice = devices.get(deviceId);
116 116
...@@ -151,7 +151,7 @@ public class SimpleDeviceStore ...@@ -151,7 +151,7 @@ public class SimpleDeviceStore
151 // We allow only certain attributes to trigger update 151 // We allow only certain attributes to trigger update
152 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) || 152 if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
153 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) || 153 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
154 - !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) { 154 + !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
155 155
156 synchronized (this) { 156 synchronized (this) {
157 devices.replace(newDevice.id(), oldDevice, newDevice); 157 devices.replace(newDevice.id(), oldDevice, newDevice);
...@@ -238,7 +238,7 @@ public class SimpleDeviceStore ...@@ -238,7 +238,7 @@ public class SimpleDeviceStore
238 Port newPort, 238 Port newPort,
239 ConcurrentMap<PortNumber, Port> ports) { 239 ConcurrentMap<PortNumber, Port> ports) {
240 if (oldPort.isEnabled() != newPort.isEnabled() || 240 if (oldPort.isEnabled() != newPort.isEnabled() ||
241 - !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) { 241 + !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
242 242
243 ports.put(oldPort.number(), newPort); 243 ports.put(oldPort.number(), newPort);
244 return new DeviceEvent(PORT_UPDATED, device, newPort); 244 return new DeviceEvent(PORT_UPDATED, device, newPort);
...@@ -264,11 +264,17 @@ public class SimpleDeviceStore ...@@ -264,11 +264,17 @@ public class SimpleDeviceStore
264 return events; 264 return events;
265 } 265 }
266 266
267 + private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
268 + DeviceId deviceId) {
269 + return createIfAbsentUnchecked(deviceDescs, deviceId,
270 + NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
271 + }
272 +
267 // Gets the map of ports for the specified device; if one does not already 273 // Gets the map of ports for the specified device; if one does not already
268 // exist, it creates and registers a new one. 274 // exist, it creates and registers a new one.
269 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) { 275 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
270 return createIfAbsentUnchecked(devicePorts, deviceId, 276 return createIfAbsentUnchecked(devicePorts, deviceId,
271 - new InitConcurrentHashMap<PortNumber, Port>()); 277 + NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
272 } 278 }
273 279
274 @Override 280 @Override
...@@ -325,31 +331,12 @@ public class SimpleDeviceStore ...@@ -325,31 +331,12 @@ public class SimpleDeviceStore
325 public DeviceEvent removeDevice(DeviceId deviceId) { 331 public DeviceEvent removeDevice(DeviceId deviceId) {
326 synchronized (this) { 332 synchronized (this) {
327 Device device = devices.remove(deviceId); 333 Device device = devices.remove(deviceId);
334 + // FIXME: should we be removing deviceDescs also?
328 return device == null ? null : 335 return device == null ? null :
329 new DeviceEvent(DEVICE_REMOVED, device, null); 336 new DeviceEvent(DEVICE_REMOVED, device, null);
330 } 337 }
331 } 338 }
332 339
333 - private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
334 - if (lhs == rhs) {
335 - return true;
336 - }
337 - if (lhs == null || rhs == null) {
338 - return false;
339 - }
340 -
341 - if (!lhs.keys().equals(rhs.keys())) {
342 - return false;
343 - }
344 -
345 - for (String key : lhs.keys()) {
346 - if (!lhs.value(key).equals(rhs.value(key))) {
347 - return false;
348 - }
349 - }
350 - return true;
351 - }
352 -
353 /** 340 /**
354 * Returns a Device, merging description given from multiple Providers. 341 * Returns a Device, merging description given from multiple Providers.
355 * 342 *
...@@ -445,15 +432,6 @@ public class SimpleDeviceStore ...@@ -445,15 +432,6 @@ public class SimpleDeviceStore
445 return fallBackPrimary; 432 return fallBackPrimary;
446 } 433 }
447 434
448 - // TODO: can be made generic
449 - private static final class InitConcurrentHashMap<K, V> implements
450 - ConcurrentInitializer<ConcurrentMap<K, V>> {
451 - @Override
452 - public ConcurrentMap<K, V> get() throws ConcurrentException {
453 - return new ConcurrentHashMap<>();
454 - }
455 - }
456 -
457 public static final class InitDeviceDescs 435 public static final class InitDeviceDescs
458 implements ConcurrentInitializer<DeviceDescriptions> { 436 implements ConcurrentInitializer<DeviceDescriptions> {
459 private final DeviceDescription deviceDesc; 437 private final DeviceDescription deviceDesc;
......
1 package org.onlab.onos.store.trivial.impl; 1 package org.onlab.onos.store.trivial.impl;
2 2
3 +import com.google.common.base.Function;
4 +import com.google.common.base.Predicate;
5 +import com.google.common.collect.FluentIterable;
3 import com.google.common.collect.HashMultimap; 6 import com.google.common.collect.HashMultimap;
4 -import com.google.common.collect.ImmutableSet; 7 +import com.google.common.collect.SetMultimap;
5 -import com.google.common.collect.Multimap;
6 8
9 +import org.apache.commons.lang3.concurrent.ConcurrentUtils;
7 import org.apache.felix.scr.annotations.Activate; 10 import org.apache.felix.scr.annotations.Activate;
8 import org.apache.felix.scr.annotations.Component; 11 import org.apache.felix.scr.annotations.Component;
9 import org.apache.felix.scr.annotations.Deactivate; 12 import org.apache.felix.scr.annotations.Deactivate;
10 import org.apache.felix.scr.annotations.Service; 13 import org.apache.felix.scr.annotations.Service;
14 +import org.onlab.onos.net.Annotations;
15 +import org.onlab.onos.net.AnnotationsUtil;
11 import org.onlab.onos.net.ConnectPoint; 16 import org.onlab.onos.net.ConnectPoint;
17 +import org.onlab.onos.net.DefaultAnnotations;
12 import org.onlab.onos.net.DefaultLink; 18 import org.onlab.onos.net.DefaultLink;
13 import org.onlab.onos.net.DeviceId; 19 import org.onlab.onos.net.DeviceId;
14 import org.onlab.onos.net.Link; 20 import org.onlab.onos.net.Link;
21 +import org.onlab.onos.net.SparseAnnotations;
22 +import org.onlab.onos.net.Link.Type;
15 import org.onlab.onos.net.LinkKey; 23 import org.onlab.onos.net.LinkKey;
24 +import org.onlab.onos.net.Provided;
25 +import org.onlab.onos.net.link.DefaultLinkDescription;
16 import org.onlab.onos.net.link.LinkDescription; 26 import org.onlab.onos.net.link.LinkDescription;
17 import org.onlab.onos.net.link.LinkEvent; 27 import org.onlab.onos.net.link.LinkEvent;
18 import org.onlab.onos.net.link.LinkStore; 28 import org.onlab.onos.net.link.LinkStore;
19 import org.onlab.onos.net.link.LinkStoreDelegate; 29 import org.onlab.onos.net.link.LinkStoreDelegate;
20 import org.onlab.onos.net.provider.ProviderId; 30 import org.onlab.onos.net.provider.ProviderId;
21 import org.onlab.onos.store.AbstractStore; 31 import org.onlab.onos.store.AbstractStore;
32 +import org.onlab.util.NewConcurrentHashMap;
22 import org.slf4j.Logger; 33 import org.slf4j.Logger;
23 34
24 import java.util.Collections; 35 import java.util.Collections;
25 import java.util.HashSet; 36 import java.util.HashSet;
26 -import java.util.Map;
27 import java.util.Set; 37 import java.util.Set;
38 +import java.util.Map.Entry;
28 import java.util.concurrent.ConcurrentHashMap; 39 import java.util.concurrent.ConcurrentHashMap;
40 +import java.util.concurrent.ConcurrentMap;
29 41
42 +import static org.onlab.onos.net.DefaultAnnotations.merge;
30 import static org.onlab.onos.net.Link.Type.DIRECT; 43 import static org.onlab.onos.net.Link.Type.DIRECT;
31 import static org.onlab.onos.net.Link.Type.INDIRECT; 44 import static org.onlab.onos.net.Link.Type.INDIRECT;
32 import static org.onlab.onos.net.link.LinkEvent.Type.*; 45 import static org.onlab.onos.net.link.LinkEvent.Type.*;
33 import static org.slf4j.LoggerFactory.getLogger; 46 import static org.slf4j.LoggerFactory.getLogger;
47 +import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
48 +import static com.google.common.base.Predicates.notNull;
34 49
35 // TODO: Add support for multiple provider and annotations 50 // TODO: Add support for multiple provider and annotations
36 /** 51 /**
...@@ -46,11 +61,17 @@ public class SimpleLinkStore ...@@ -46,11 +61,17 @@ public class SimpleLinkStore
46 private final Logger log = getLogger(getClass()); 61 private final Logger log = getLogger(getClass());
47 62
48 // Link inventory 63 // Link inventory
49 - private final Map<LinkKey, DefaultLink> links = new ConcurrentHashMap<>(); 64 + private final ConcurrentMap<LinkKey,
65 + ConcurrentMap<ProviderId, LinkDescription>>
66 + linkDescs = new ConcurrentHashMap<>();
67 +
68 + // Link instance cache
69 + private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
50 70
51 // Egress and ingress link sets 71 // Egress and ingress link sets
52 - private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create(); 72 + private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
53 - private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create(); 73 + private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
74 +
54 75
55 @Activate 76 @Activate
56 public void activate() { 77 public void activate() {
...@@ -59,6 +80,10 @@ public class SimpleLinkStore ...@@ -59,6 +80,10 @@ public class SimpleLinkStore
59 80
60 @Deactivate 81 @Deactivate
61 public void deactivate() { 82 public void deactivate() {
83 + linkDescs.clear();
84 + links.clear();
85 + srcLinks.clear();
86 + dstLinks.clear();
62 log.info("Stopped"); 87 log.info("Stopped");
63 } 88 }
64 89
...@@ -69,17 +94,29 @@ public class SimpleLinkStore ...@@ -69,17 +94,29 @@ public class SimpleLinkStore
69 94
70 @Override 95 @Override
71 public Iterable<Link> getLinks() { 96 public Iterable<Link> getLinks() {
72 - return Collections.unmodifiableSet(new HashSet<Link>(links.values())); 97 + return Collections.unmodifiableCollection(links.values());
73 } 98 }
74 99
75 @Override 100 @Override
76 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) { 101 public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
77 - return ImmutableSet.copyOf(srcLinks.get(deviceId)); 102 + // lock for iteration
103 + synchronized (srcLinks) {
104 + return FluentIterable.from(srcLinks.get(deviceId))
105 + .transform(lookupLink())
106 + .filter(notNull())
107 + .toSet();
108 + }
78 } 109 }
79 110
80 @Override 111 @Override
81 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) { 112 public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
82 - return ImmutableSet.copyOf(dstLinks.get(deviceId)); 113 + // lock for iteration
114 + synchronized (dstLinks) {
115 + return FluentIterable.from(dstLinks.get(deviceId))
116 + .transform(lookupLink())
117 + .filter(notNull())
118 + .toSet();
119 + }
83 } 120 }
84 121
85 @Override 122 @Override
...@@ -90,9 +127,9 @@ public class SimpleLinkStore ...@@ -90,9 +127,9 @@ public class SimpleLinkStore
90 @Override 127 @Override
91 public Set<Link> getEgressLinks(ConnectPoint src) { 128 public Set<Link> getEgressLinks(ConnectPoint src) {
92 Set<Link> egress = new HashSet<>(); 129 Set<Link> egress = new HashSet<>();
93 - for (Link link : srcLinks.get(src.deviceId())) { 130 + for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
94 - if (link.src().equals(src)) { 131 + if (linkKey.src().equals(src)) {
95 - egress.add(link); 132 + egress.add(links.get(linkKey));
96 } 133 }
97 } 134 }
98 return egress; 135 return egress;
...@@ -101,9 +138,9 @@ public class SimpleLinkStore ...@@ -101,9 +138,9 @@ public class SimpleLinkStore
101 @Override 138 @Override
102 public Set<Link> getIngressLinks(ConnectPoint dst) { 139 public Set<Link> getIngressLinks(ConnectPoint dst) {
103 Set<Link> ingress = new HashSet<>(); 140 Set<Link> ingress = new HashSet<>();
104 - for (Link link : dstLinks.get(dst.deviceId())) { 141 + for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
105 - if (link.dst().equals(dst)) { 142 + if (linkKey.dst().equals(dst)) {
106 - ingress.add(link); 143 + ingress.add(links.get(linkKey));
107 } 144 }
108 } 145 }
109 return ingress; 146 return ingress;
...@@ -113,56 +150,172 @@ public class SimpleLinkStore ...@@ -113,56 +150,172 @@ public class SimpleLinkStore
113 public LinkEvent createOrUpdateLink(ProviderId providerId, 150 public LinkEvent createOrUpdateLink(ProviderId providerId,
114 LinkDescription linkDescription) { 151 LinkDescription linkDescription) {
115 LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst()); 152 LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
116 - DefaultLink link = links.get(key); 153 +
117 - if (link == null) { 154 + ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
118 - return createLink(providerId, key, linkDescription); 155 + synchronized (descs) {
156 + final Link oldLink = links.get(key);
157 + // update description
158 + createOrUpdateLinkDescription(descs, providerId, linkDescription);
159 + final Link newLink = composeLink(descs);
160 + if (oldLink == null) {
161 + return createLink(key, newLink);
162 + }
163 + return updateLink(key, oldLink, newLink);
119 } 164 }
120 - return updateLink(providerId, link, key, linkDescription);
121 } 165 }
122 166
123 - // Creates and stores the link and returns the appropriate event. 167 + // Guarded by linkDescs value (=locking each Link)
124 - private LinkEvent createLink(ProviderId providerId, LinkKey key, 168 + private LinkDescription createOrUpdateLinkDescription(
169 + ConcurrentMap<ProviderId, LinkDescription> descs,
170 + ProviderId providerId,
125 LinkDescription linkDescription) { 171 LinkDescription linkDescription) {
126 - DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(), 172 +
127 - linkDescription.type()); 173 + // merge existing attributes and merge
128 - synchronized (this) { 174 + LinkDescription oldDesc = descs.get(providerId);
129 - links.put(key, link); 175 + LinkDescription newDesc = linkDescription;
130 - srcLinks.put(link.src().deviceId(), link); 176 + if (oldDesc != null) {
131 - dstLinks.put(link.dst().deviceId(), link); 177 + SparseAnnotations merged = merge(oldDesc.annotations(),
178 + linkDescription.annotations());
179 + newDesc = new DefaultLinkDescription(
180 + linkDescription.src(),
181 + linkDescription.dst(),
182 + linkDescription.type(), merged);
183 + }
184 + return descs.put(providerId, newDesc);
132 } 185 }
133 - return new LinkEvent(LINK_ADDED, link); 186 +
187 + // Creates and stores the link and returns the appropriate event.
188 + // Guarded by linkDescs value (=locking each Link)
189 + private LinkEvent createLink(LinkKey key, Link newLink) {
190 +
191 + if (newLink.providerId().isAncillary()) {
192 + // TODO: revisit ancillary only Link handling
193 +
194 + // currently treating ancillary only as down (not visible outside)
195 + return null;
196 + }
197 +
198 + links.put(key, newLink);
199 + srcLinks.put(newLink.src().deviceId(), key);
200 + dstLinks.put(newLink.dst().deviceId(), key);
201 + return new LinkEvent(LINK_ADDED, newLink);
134 } 202 }
135 203
136 // Updates, if necessary the specified link and returns the appropriate event. 204 // Updates, if necessary the specified link and returns the appropriate event.
137 - private LinkEvent updateLink(ProviderId providerId, DefaultLink link, 205 + // Guarded by linkDescs value (=locking each Link)
138 - LinkKey key, LinkDescription linkDescription) { 206 + private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
139 - if (link.type() == INDIRECT && linkDescription.type() == DIRECT) { 207 +
140 - synchronized (this) { 208 + if (newLink.providerId().isAncillary()) {
141 - srcLinks.remove(link.src().deviceId(), link); 209 + // TODO: revisit ancillary only Link handling
142 - dstLinks.remove(link.dst().deviceId(), link); 210 +
143 - 211 + // currently treating ancillary only as down (not visible outside)
144 - DefaultLink updated = 212 + return null;
145 - new DefaultLink(providerId, link.src(), link.dst(),
146 - linkDescription.type());
147 - links.put(key, updated);
148 - srcLinks.put(link.src().deviceId(), updated);
149 - dstLinks.put(link.dst().deviceId(), updated);
150 - return new LinkEvent(LINK_UPDATED, updated);
151 } 213 }
214 +
215 + if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
216 + !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
217 +
218 + links.put(key, newLink);
219 + // strictly speaking following can be ommitted
220 + srcLinks.put(oldLink.src().deviceId(), key);
221 + dstLinks.put(oldLink.dst().deviceId(), key);
222 + return new LinkEvent(LINK_UPDATED, newLink);
152 } 223 }
153 return null; 224 return null;
154 } 225 }
155 226
156 @Override 227 @Override
157 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) { 228 public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
158 - synchronized (this) { 229 + final LinkKey key = new LinkKey(src, dst);
159 - Link link = links.remove(new LinkKey(src, dst)); 230 + ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
231 + synchronized (descs) {
232 + Link link = links.remove(key);
233 + // FIXME: should we be removing deviceDescs also?
160 if (link != null) { 234 if (link != null) {
161 - srcLinks.remove(link.src().deviceId(), link); 235 + srcLinks.remove(link.src().deviceId(), key);
162 - dstLinks.remove(link.dst().deviceId(), link); 236 + dstLinks.remove(link.dst().deviceId(), key);
163 return new LinkEvent(LINK_REMOVED, link); 237 return new LinkEvent(LINK_REMOVED, link);
164 } 238 }
165 return null; 239 return null;
166 } 240 }
167 } 241 }
242 +
243 + private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
244 + return synchronizedSetMultimap(HashMultimap.<K, V>create());
245 + }
246 +
247 + /**
248 + * @return primary ProviderID, or randomly chosen one if none exists
249 + */
250 + private ProviderId pickPrimaryPID(
251 + ConcurrentMap<ProviderId, LinkDescription> providerDescs) {
252 +
253 + ProviderId fallBackPrimary = null;
254 + for (Entry<ProviderId, LinkDescription> e : providerDescs.entrySet()) {
255 + if (!e.getKey().isAncillary()) {
256 + return e.getKey();
257 + } else if (fallBackPrimary == null) {
258 + // pick randomly as a fallback in case there is no primary
259 + fallBackPrimary = e.getKey();
260 + }
261 + }
262 + return fallBackPrimary;
263 + }
264 +
265 + private Link composeLink(ConcurrentMap<ProviderId, LinkDescription> descs) {
266 + ProviderId primary = pickPrimaryPID(descs);
267 + LinkDescription base = descs.get(primary);
268 +
269 + ConnectPoint src = base.src();
270 + ConnectPoint dst = base.dst();
271 + Type type = base.type();
272 + Annotations annotations = DefaultAnnotations.builder().build();
273 + annotations = merge(annotations, base.annotations());
274 +
275 + for (Entry<ProviderId, LinkDescription> e : descs.entrySet()) {
276 + if (primary.equals(e.getKey())) {
277 + continue;
278 + }
279 +
280 + // TODO: should keep track of Description timestamp
281 + // and only merge conflicting keys when timestamp is newer
282 + // Currently assuming there will never be a key conflict between
283 + // providers
284 +
285 + // annotation merging. not so efficient, should revisit later
286 + annotations = merge(annotations, e.getValue().annotations());
287 + }
288 +
289 + return new DefaultLink(primary , src, dst, type, annotations);
290 + }
291 +
292 + private ConcurrentMap<ProviderId, LinkDescription> getLinkDescriptions(LinkKey key) {
293 + return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
294 + NewConcurrentHashMap.<ProviderId, LinkDescription>ifNeeded());
295 + }
296 +
297 + private final Function<LinkKey, Link> lookupLink = new LookupLink();
298 + private Function<LinkKey, Link> lookupLink() {
299 + return lookupLink;
300 + }
301 +
302 + private final class LookupLink implements Function<LinkKey, Link> {
303 + @Override
304 + public Link apply(LinkKey input) {
305 + return links.get(input);
306 + }
307 + }
308 +
309 + private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
310 + private static final Predicate<Provided> isPrimary() {
311 + return IS_PRIMARY;
312 + }
313 +
314 + private static final class IsPrimary implements Predicate<Provided> {
315 +
316 + @Override
317 + public boolean apply(Provided input) {
318 + return !input.providerId().isAncillary();
319 + }
320 + }
168 } 321 }
......
...@@ -126,6 +126,7 @@ public class SimpleDeviceStoreTest { ...@@ -126,6 +126,7 @@ public class SimpleDeviceStoreTest {
126 assertEquals(SN, device.serialNumber()); 126 assertEquals(SN, device.serialNumber());
127 } 127 }
128 128
129 + // TODO slice this out somewhere
129 /** 130 /**
130 * Verifies that Annotations created by merging {@code annotations} is 131 * Verifies that Annotations created by merging {@code annotations} is
131 * equal to actual Annotations. 132 * equal to actual Annotations.
...@@ -133,7 +134,7 @@ public class SimpleDeviceStoreTest { ...@@ -133,7 +134,7 @@ public class SimpleDeviceStoreTest {
133 * @param actual Annotations to check 134 * @param actual Annotations to check
134 * @param annotations 135 * @param annotations
135 */ 136 */
136 - private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) { 137 + public static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
137 DefaultAnnotations expected = DefaultAnnotations.builder().build(); 138 DefaultAnnotations expected = DefaultAnnotations.builder().build();
138 for (SparseAnnotations a : annotations) { 139 for (SparseAnnotations a : annotations) {
139 expected = DefaultAnnotations.merge(expected, a); 140 expected = DefaultAnnotations.merge(expected, a);
...@@ -347,6 +348,7 @@ public class SimpleDeviceStoreTest { ...@@ -347,6 +348,7 @@ public class SimpleDeviceStoreTest {
347 assertFalse("Port is disabled", event.port().isEnabled()); 348 assertFalse("Port is disabled", event.port().isEnabled());
348 349
349 } 350 }
351 +
350 @Test 352 @Test
351 public final void testUpdatePortStatusAncillary() { 353 public final void testUpdatePortStatusAncillary() {
352 putDeviceAncillary(DID1, SW1); 354 putDeviceAncillary(DID1, SW1);
......
...@@ -4,7 +4,9 @@ import static org.junit.Assert.*; ...@@ -4,7 +4,9 @@ import static org.junit.Assert.*;
4 import static org.onlab.onos.net.DeviceId.deviceId; 4 import static org.onlab.onos.net.DeviceId.deviceId;
5 import static org.onlab.onos.net.Link.Type.*; 5 import static org.onlab.onos.net.Link.Type.*;
6 import static org.onlab.onos.net.link.LinkEvent.Type.*; 6 import static org.onlab.onos.net.link.LinkEvent.Type.*;
7 +import static org.onlab.onos.store.trivial.impl.SimpleDeviceStoreTest.assertAnnotationsEquals;
7 8
9 +import java.util.Collections;
8 import java.util.HashMap; 10 import java.util.HashMap;
9 import java.util.Map; 11 import java.util.Map;
10 import java.util.Set; 12 import java.util.Set;
...@@ -18,10 +20,12 @@ import org.junit.BeforeClass; ...@@ -18,10 +20,12 @@ import org.junit.BeforeClass;
18 import org.junit.Ignore; 20 import org.junit.Ignore;
19 import org.junit.Test; 21 import org.junit.Test;
20 import org.onlab.onos.net.ConnectPoint; 22 import org.onlab.onos.net.ConnectPoint;
23 +import org.onlab.onos.net.DefaultAnnotations;
21 import org.onlab.onos.net.DeviceId; 24 import org.onlab.onos.net.DeviceId;
22 import org.onlab.onos.net.Link; 25 import org.onlab.onos.net.Link;
23 import org.onlab.onos.net.LinkKey; 26 import org.onlab.onos.net.LinkKey;
24 import org.onlab.onos.net.PortNumber; 27 import org.onlab.onos.net.PortNumber;
28 +import org.onlab.onos.net.SparseAnnotations;
25 import org.onlab.onos.net.Link.Type; 29 import org.onlab.onos.net.Link.Type;
26 import org.onlab.onos.net.link.DefaultLinkDescription; 30 import org.onlab.onos.net.link.DefaultLinkDescription;
27 import org.onlab.onos.net.link.LinkEvent; 31 import org.onlab.onos.net.link.LinkEvent;
...@@ -37,6 +41,7 @@ import com.google.common.collect.Iterables; ...@@ -37,6 +41,7 @@ import com.google.common.collect.Iterables;
37 public class SimpleLinkStoreTest { 41 public class SimpleLinkStoreTest {
38 42
39 private static final ProviderId PID = new ProviderId("of", "foo"); 43 private static final ProviderId PID = new ProviderId("of", "foo");
44 + private static final ProviderId PIDA = new ProviderId("of", "bar", true);
40 private static final DeviceId DID1 = deviceId("of:foo"); 45 private static final DeviceId DID1 = deviceId("of:foo");
41 private static final DeviceId DID2 = deviceId("of:bar"); 46 private static final DeviceId DID2 = deviceId("of:bar");
42 47
...@@ -44,6 +49,23 @@ public class SimpleLinkStoreTest { ...@@ -44,6 +49,23 @@ public class SimpleLinkStoreTest {
44 private static final PortNumber P2 = PortNumber.portNumber(2); 49 private static final PortNumber P2 = PortNumber.portNumber(2);
45 private static final PortNumber P3 = PortNumber.portNumber(3); 50 private static final PortNumber P3 = PortNumber.portNumber(3);
46 51
52 + private static final SparseAnnotations A1 = DefaultAnnotations.builder()
53 + .set("A1", "a1")
54 + .set("B1", "b1")
55 + .build();
56 + private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
57 + .remove("A1")
58 + .set("B3", "b3")
59 + .build();
60 + private static final SparseAnnotations A2 = DefaultAnnotations.builder()
61 + .set("A2", "a2")
62 + .set("B2", "b2")
63 + .build();
64 + private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
65 + .remove("A2")
66 + .set("B4", "b4")
67 + .build();
68 +
47 69
48 private SimpleLinkStore simpleLinkStore; 70 private SimpleLinkStore simpleLinkStore;
49 private LinkStore linkStore; 71 private LinkStore linkStore;
...@@ -270,6 +292,59 @@ public class SimpleLinkStoreTest { ...@@ -270,6 +292,59 @@ public class SimpleLinkStoreTest {
270 } 292 }
271 293
272 @Test 294 @Test
295 + public final void testCreateOrUpdateLinkAncillary() {
296 + ConnectPoint src = new ConnectPoint(DID1, P1);
297 + ConnectPoint dst = new ConnectPoint(DID2, P2);
298 +
299 + // add Ancillary link
300 + LinkEvent event = linkStore.createOrUpdateLink(PIDA,
301 + new DefaultLinkDescription(src, dst, INDIRECT, A1));
302 +
303 + assertNull("Ancillary only link is ignored", event);
304 +
305 + // add Primary link
306 + LinkEvent event2 = linkStore.createOrUpdateLink(PID,
307 + new DefaultLinkDescription(src, dst, INDIRECT, A2));
308 +
309 + assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
310 + assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
311 + assertEquals(LINK_ADDED, event2.type());
312 +
313 + // update link type
314 + LinkEvent event3 = linkStore.createOrUpdateLink(PID,
315 + new DefaultLinkDescription(src, dst, DIRECT, A2));
316 + assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
317 + assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
318 + assertEquals(LINK_UPDATED, event3.type());
319 +
320 +
321 + // no change
322 + LinkEvent event4 = linkStore.createOrUpdateLink(PID,
323 + new DefaultLinkDescription(src, dst, DIRECT));
324 + assertNull("No change event expected", event4);
325 +
326 + // update link annotation (Primary)
327 + LinkEvent event5 = linkStore.createOrUpdateLink(PID,
328 + new DefaultLinkDescription(src, dst, DIRECT, A2_2));
329 + assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
330 + assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
331 + assertEquals(LINK_UPDATED, event5.type());
332 +
333 + // update link annotation (Ancillary)
334 + LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
335 + new DefaultLinkDescription(src, dst, DIRECT, A1_2));
336 + assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
337 + assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
338 + assertEquals(LINK_UPDATED, event6.type());
339 +
340 + // update link type (Ancillary) : ignored
341 + LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
342 + new DefaultLinkDescription(src, dst, EDGE));
343 + assertNull("Ancillary change other than annotation is ignored", event7);
344 + }
345 +
346 +
347 + @Test
273 public final void testRemoveLink() { 348 public final void testRemoveLink() {
274 final ConnectPoint d1P1 = new ConnectPoint(DID1, P1); 349 final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
275 final ConnectPoint d2P2 = new ConnectPoint(DID2, P2); 350 final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
...@@ -291,6 +366,30 @@ public class SimpleLinkStoreTest { ...@@ -291,6 +366,30 @@ public class SimpleLinkStoreTest {
291 assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1)); 366 assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
292 } 367 }
293 368
369 + @Test
370 + public final void testAncillaryOnlyNotVisible() {
371 + ConnectPoint src = new ConnectPoint(DID1, P1);
372 + ConnectPoint dst = new ConnectPoint(DID2, P2);
373 +
374 + // add Ancillary link
375 + linkStore.createOrUpdateLink(PIDA,
376 + new DefaultLinkDescription(src, dst, INDIRECT, A1));
377 +
378 + // Ancillary only link should not be visible
379 + assertEquals(0, linkStore.getLinkCount());
380 +
381 + assertTrue(Iterables.isEmpty(linkStore.getLinks()));
382 +
383 + assertNull(linkStore.getLink(src, dst));
384 +
385 + assertEquals(Collections.emptySet(), linkStore.getIngressLinks(dst));
386 +
387 + assertEquals(Collections.emptySet(), linkStore.getEgressLinks(src));
388 +
389 + assertEquals(Collections.emptySet(), linkStore.getDeviceEgressLinks(DID1));
390 + assertEquals(Collections.emptySet(), linkStore.getDeviceIngressLinks(DID2));
391 + }
392 +
294 // If Delegates should be called only on remote events, 393 // If Delegates should be called only on remote events,
295 // then Simple* should never call them, thus not test required. 394 // then Simple* should never call them, thus not test required.
296 @Ignore("Ignore until Delegate spec. is clear.") 395 @Ignore("Ignore until Delegate spec. is clear.")
......
1 +package org.onlab.util;
2 +
3 +import java.util.concurrent.ConcurrentHashMap;
4 +import java.util.concurrent.ConcurrentMap;
5 +
6 +import org.apache.commons.lang3.concurrent.ConcurrentException;
7 +import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
8 +
9 +/**
10 + * Creates an instance of new ConcurrentHashMap on each {@link #get()} call.
11 + * <p>
12 + * To be used with
13 + * {@link org.apache.commons.lang3.concurrent.ConcurrentUtils#createIfAbsent()
14 + * ConcurrentUtils#createIfAbsent}
15 + *
16 + * @param <K> ConcurrentHashMap key type
17 + * @param <V> ConcurrentHashMap value type
18 + */
19 +public final class NewConcurrentHashMap<K, V>
20 + implements ConcurrentInitializer<ConcurrentMap<K, V>> {
21 +
22 + public static final NewConcurrentHashMap<?, ?> INSTANCE = new NewConcurrentHashMap<>();
23 +
24 + @SuppressWarnings("unchecked")
25 + public static <K, V> NewConcurrentHashMap<K, V> ifNeeded() {
26 + return (NewConcurrentHashMap<K, V>) INSTANCE;
27 + }
28 +
29 + @Override
30 + public ConcurrentMap<K, V> get() throws ConcurrentException {
31 + return new ConcurrentHashMap<>();
32 + }
33 +}