Marc De Leenheer
Committed by Gerrit Code Review

Injecting topology through JSON ConfigProvider works for multi-instance (ONOS-490).

Change-Id: Ib977f4cf9a59ddec360072891fd803c6f9ee84f1

Injecting optical device annotations and ports works for multi-instance (ONOS-870).

Change-Id: Icdde16ef72fc4e47eec7213250b04902083f0537
...@@ -15,10 +15,10 @@ ...@@ -15,10 +15,10 @@
15 */ 15 */
16 package org.onosproject.event; 16 package org.onosproject.event;
17 17
18 -import static com.google.common.base.MoreObjects.toStringHelper;
19 -
20 import org.joda.time.LocalDateTime; 18 import org.joda.time.LocalDateTime;
21 19
20 +import static com.google.common.base.MoreObjects.toStringHelper;
21 +
22 /** 22 /**
23 * Base event implementation. 23 * Base event implementation.
24 */ 24 */
...@@ -75,5 +75,4 @@ public class AbstractEvent<T extends Enum, S> implements Event<T, S> { ...@@ -75,5 +75,4 @@ public class AbstractEvent<T extends Enum, S> implements Event<T, S> {
75 .add("subject", subject()) 75 .add("subject", subject())
76 .toString(); 76 .toString();
77 } 77 }
78 -
79 } 78 }
......
...@@ -47,7 +47,7 @@ public class DefaultDeviceDescription extends AbstractDescription ...@@ -47,7 +47,7 @@ public class DefaultDeviceDescription extends AbstractDescription
47 * @param hwVersion device HW version 47 * @param hwVersion device HW version
48 * @param swVersion device SW version 48 * @param swVersion device SW version
49 * @param serialNumber device serial number 49 * @param serialNumber device serial number
50 - * @param chassis chasis id 50 + * @param chassis chassis id
51 * @param annotations optional key/value annotations map 51 * @param annotations optional key/value annotations map
52 */ 52 */
53 public DefaultDeviceDescription(URI uri, Type type, String manufacturer, 53 public DefaultDeviceDescription(URI uri, Type type, String manufacturer,
......
...@@ -306,18 +306,16 @@ public class DeviceManager ...@@ -306,18 +306,16 @@ public class DeviceManager
306 // TODO: Do we need to explicitly tell the Provider that 306 // TODO: Do we need to explicitly tell the Provider that
307 // this instance is not the MASTER 307 // this instance is not the MASTER
308 applyRole(deviceId, MastershipRole.STANDBY); 308 applyRole(deviceId, MastershipRole.STANDBY);
309 - return; 309 + } else {
310 - }
311 log.info("Role of this node is MASTER for {}", deviceId); 310 log.info("Role of this node is MASTER for {}", deviceId);
312 -
313 // tell clock provider if this instance is the master 311 // tell clock provider if this instance is the master
314 deviceClockProviderService.setMastershipTerm(deviceId, term); 312 deviceClockProviderService.setMastershipTerm(deviceId, term);
313 + applyRole(deviceId, MastershipRole.MASTER);
314 + }
315 315
316 DeviceEvent event = store.createOrUpdateDevice(provider().id(), 316 DeviceEvent event = store.createOrUpdateDevice(provider().id(),
317 deviceId, deviceDescription); 317 deviceId, deviceDescription);
318 318
319 - applyRole(deviceId, MastershipRole.MASTER);
320 -
321 // If there was a change of any kind, tell the provider 319 // If there was a change of any kind, tell the provider
322 // that this instance is the master. 320 // that this instance is the master.
323 if (event != null) { 321 if (event != null) {
......
1 +package org.onosproject.store.device.impl;
2 +
3 +import com.google.common.base.MoreObjects;
4 +import org.onosproject.net.DeviceId;
5 +import org.onosproject.net.device.DeviceDescription;
6 +import org.onosproject.net.provider.ProviderId;
7 +
8 +public class DeviceInjectedEvent {
9 + private final ProviderId providerId;
10 + private final DeviceId deviceId;
11 + private final DeviceDescription deviceDescription;
12 +
13 + protected DeviceInjectedEvent(
14 + ProviderId providerId,
15 + DeviceId deviceId,
16 + DeviceDescription deviceDescription) {
17 + this.providerId = providerId;
18 + this.deviceId = deviceId;
19 + this.deviceDescription = deviceDescription;
20 + }
21 +
22 + public DeviceId deviceId() {
23 + return deviceId;
24 + }
25 +
26 + public ProviderId providerId() {
27 + return providerId;
28 + }
29 +
30 + public DeviceDescription deviceDescription() {
31 + return deviceDescription;
32 + }
33 +
34 + @Override
35 + public String toString() {
36 + return MoreObjects.toStringHelper(getClass())
37 + .add("providerId", providerId)
38 + .add("deviceId", deviceId)
39 + .add("deviceDescription", deviceDescription)
40 + .toString();
41 + }
42 +
43 + // for serializer
44 + protected DeviceInjectedEvent() {
45 + this.providerId = null;
46 + this.deviceId = null;
47 + this.deviceDescription = null;
48 + }
49 +}
...@@ -21,7 +21,6 @@ import com.google.common.collect.FluentIterable; ...@@ -21,7 +21,6 @@ import com.google.common.collect.FluentIterable;
21 import com.google.common.collect.ImmutableList; 21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Maps; 22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets; 23 import com.google.common.collect.Sets;
24 -
25 import org.apache.commons.lang3.RandomUtils; 24 import org.apache.commons.lang3.RandomUtils;
26 import org.apache.felix.scr.annotations.Activate; 25 import org.apache.felix.scr.annotations.Activate;
27 import org.apache.felix.scr.annotations.Component; 26 import org.apache.felix.scr.annotations.Component;
...@@ -29,6 +28,9 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -29,6 +28,9 @@ import org.apache.felix.scr.annotations.Deactivate;
29 import org.apache.felix.scr.annotations.Reference; 28 import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality; 29 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.apache.felix.scr.annotations.Service; 30 import org.apache.felix.scr.annotations.Service;
31 +import org.onlab.packet.ChassisId;
32 +import org.onlab.util.KryoNamespace;
33 +import org.onlab.util.NewConcurrentHashMap;
32 import org.onosproject.cluster.ClusterService; 34 import org.onosproject.cluster.ClusterService;
33 import org.onosproject.cluster.ControllerNode; 35 import org.onosproject.cluster.ControllerNode;
34 import org.onosproject.cluster.NodeId; 36 import org.onosproject.cluster.NodeId;
...@@ -61,9 +63,6 @@ import org.onosproject.store.cluster.messaging.MessageSubject; ...@@ -61,9 +63,6 @@ import org.onosproject.store.cluster.messaging.MessageSubject;
61 import org.onosproject.store.impl.Timestamped; 63 import org.onosproject.store.impl.Timestamped;
62 import org.onosproject.store.serializers.KryoSerializer; 64 import org.onosproject.store.serializers.KryoSerializer;
63 import org.onosproject.store.serializers.impl.DistributedStoreSerializers; 65 import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
64 -import org.onlab.packet.ChassisId;
65 -import org.onlab.util.KryoNamespace;
66 -import org.onlab.util.NewConcurrentHashMap;
67 import org.slf4j.Logger; 66 import org.slf4j.Logger;
68 67
69 import java.io.IOException; 68 import java.io.IOException;
...@@ -86,17 +85,17 @@ import java.util.concurrent.TimeUnit; ...@@ -86,17 +85,17 @@ import java.util.concurrent.TimeUnit;
86 85
87 import static com.google.common.base.Preconditions.checkArgument; 86 import static com.google.common.base.Preconditions.checkArgument;
88 import static com.google.common.base.Predicates.notNull; 87 import static com.google.common.base.Predicates.notNull;
89 -import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
90 -import static org.onosproject.net.device.DeviceEvent.Type.*;
91 -import static org.slf4j.LoggerFactory.getLogger;
92 -import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
93 -import static org.onosproject.net.DefaultAnnotations.merge;
94 import static com.google.common.base.Verify.verify; 88 import static com.google.common.base.Verify.verify;
89 +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
90 +import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
95 import static org.onlab.util.Tools.minPriority; 91 import static org.onlab.util.Tools.minPriority;
96 import static org.onlab.util.Tools.namedThreads; 92 import static org.onlab.util.Tools.namedThreads;
97 -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 93 +import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
98 -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE; 94 +import static org.onosproject.net.DefaultAnnotations.merge;
99 -import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ; 95 +import static org.onosproject.net.device.DeviceEvent.Type.*;
96 +import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
97 +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
98 +import static org.slf4j.LoggerFactory.getLogger;
100 99
101 /** 100 /**
102 * Manages inventory of infrastructure devices using gossip protocol to distribute 101 * Manages inventory of infrastructure devices using gossip protocol to distribute
...@@ -111,6 +110,8 @@ public class GossipDeviceStore ...@@ -111,6 +110,8 @@ public class GossipDeviceStore
111 private final Logger log = getLogger(getClass()); 110 private final Logger log = getLogger(getClass());
112 111
113 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; 112 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
113 + // Timeout in milliseconds to process device or ports on remote master node
114 + private static final int REMOTE_MASTER_TIMEOUT = 1000;
114 115
115 // innerMap is used to lock a Device, thus instance should never be replaced. 116 // innerMap is used to lock a Device, thus instance should never be replaced.
116 // collection of Description given from various providers 117 // collection of Description given from various providers
...@@ -158,6 +159,8 @@ public class GossipDeviceStore ...@@ -158,6 +159,8 @@ public class GossipDeviceStore
158 .register(DeviceAntiEntropyAdvertisement.class) 159 .register(DeviceAntiEntropyAdvertisement.class)
159 .register(DeviceFragmentId.class) 160 .register(DeviceFragmentId.class)
160 .register(PortFragmentId.class) 161 .register(PortFragmentId.class)
162 + .register(DeviceInjectedEvent.class)
163 + .register(PortInjectedEvent.class)
161 .build(); 164 .build();
162 } 165 }
163 }; 166 };
...@@ -186,6 +189,10 @@ public class GossipDeviceStore ...@@ -186,6 +189,10 @@ public class GossipDeviceStore
186 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener()); 189 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
187 clusterCommunicator.addSubscriber( 190 clusterCommunicator.addSubscriber(
188 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener()); 191 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener());
192 + clusterCommunicator.addSubscriber(
193 + GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener());
194 + clusterCommunicator.addSubscriber(
195 + GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener());
189 196
190 executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d")); 197 executor = Executors.newCachedThreadPool(namedThreads("onos-device-fg-%d"));
191 198
...@@ -251,21 +258,48 @@ public class GossipDeviceStore ...@@ -251,21 +258,48 @@ public class GossipDeviceStore
251 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, 258 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
252 DeviceId deviceId, 259 DeviceId deviceId,
253 DeviceDescription deviceDescription) { 260 DeviceDescription deviceDescription) {
261 + NodeId localNode = clusterService.getLocalNode().id();
262 + NodeId deviceNode = mastershipService.getMasterFor(deviceId);
263 +
264 + // Process device update only if we're the master,
265 + // otherwise signal the actual master.
266 + DeviceEvent deviceEvent = null;
267 + if (localNode.equals(deviceNode)) {
268 +
254 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId); 269 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
255 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); 270 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
256 - final DeviceEvent event;
257 final Timestamped<DeviceDescription> mergedDesc; 271 final Timestamped<DeviceDescription> mergedDesc;
258 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); 272 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
273 +
259 synchronized (device) { 274 synchronized (device) {
260 - event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc); 275 + deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
261 mergedDesc = device.get(providerId).getDeviceDesc(); 276 mergedDesc = device.get(providerId).getDeviceDesc();
262 } 277 }
263 - if (event != null) { 278 +
279 + if (deviceEvent != null) {
264 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", 280 log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
265 providerId, deviceId); 281 providerId, deviceId);
266 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc)); 282 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
267 } 283 }
268 - return event; 284 +
285 + } else {
286 +
287 + DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
288 + providerId, deviceId, deviceDescription);
289 + ClusterMessage clusterMessage = new ClusterMessage(localNode, DEVICE_INJECTED,
290 + SERIALIZER.encode(deviceInjectedEvent));
291 +
292 + try {
293 + clusterCommunicator.unicast(clusterMessage, deviceNode);
294 + } catch (IOException e) {
295 + log.warn("Failed to process injected device id: {} desc: {} " +
296 + "(cluster messaging failed: {})",
297 + deviceId, deviceDescription, e);
298 + }
299 +
300 + }
301 +
302 + return deviceEvent;
269 } 303 }
270 304
271 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId, 305 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
...@@ -434,6 +468,19 @@ public class GossipDeviceStore ...@@ -434,6 +468,19 @@ public class GossipDeviceStore
434 DeviceId deviceId, 468 DeviceId deviceId,
435 List<PortDescription> portDescriptions) { 469 List<PortDescription> portDescriptions) {
436 470
471 + NodeId localNode = clusterService.getLocalNode().id();
472 + // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
473 + // since it will trigger distributed store read.
474 + // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
475 + // outside Device subsystem. so that we don't have to modify both Device and Link stores.
476 + // If we don't care much about topology performance, then it might be OK.
477 + NodeId deviceNode = mastershipService.getMasterFor(deviceId);
478 +
479 + // Process port update only if we're the master of the device,
480 + // otherwise signal the actual master.
481 + List<DeviceEvent> deviceEvents = null;
482 + if (localNode.equals(deviceNode)) {
483 +
437 final Timestamp newTimestamp; 484 final Timestamp newTimestamp;
438 try { 485 try {
439 newTimestamp = deviceClockService.getTimestamp(deviceId); 486 newTimestamp = deviceClockService.getTimestamp(deviceId);
...@@ -456,12 +503,12 @@ public class GossipDeviceStore ...@@ -456,12 +503,12 @@ public class GossipDeviceStore
456 503
457 final Timestamped<List<PortDescription>> timestampedInput 504 final Timestamped<List<PortDescription>> timestampedInput
458 = new Timestamped<>(portDescriptions, newTimestamp); 505 = new Timestamped<>(portDescriptions, newTimestamp);
459 - final List<DeviceEvent> events;
460 final Timestamped<List<PortDescription>> merged; 506 final Timestamped<List<PortDescription>> merged;
461 507
462 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId); 508 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
509 +
463 synchronized (device) { 510 synchronized (device) {
464 - events = updatePortsInternal(providerId, deviceId, timestampedInput); 511 + deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
465 final DeviceDescriptions descs = device.get(providerId); 512 final DeviceDescriptions descs = device.get(providerId);
466 List<PortDescription> mergedList = 513 List<PortDescription> mergedList =
467 FluentIterable.from(portDescriptions) 514 FluentIterable.from(portDescriptions)
...@@ -474,12 +521,28 @@ public class GossipDeviceStore ...@@ -474,12 +521,28 @@ public class GossipDeviceStore
474 }).toList(); 521 }).toList();
475 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp); 522 merged = new Timestamped<List<PortDescription>>(mergedList, newTimestamp);
476 } 523 }
477 - if (!events.isEmpty()) { 524 +
525 + if (!deviceEvents.isEmpty()) {
478 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}", 526 log.info("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
479 providerId, deviceId); 527 providerId, deviceId);
480 notifyPeers(new InternalPortEvent(providerId, deviceId, merged)); 528 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
481 } 529 }
482 - return events; 530 +
531 + } else {
532 +
533 + PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
534 + ClusterMessage clusterMessage = new ClusterMessage(
535 + localNode, PORT_INJECTED, SERIALIZER.encode(portInjectedEvent));
536 + try {
537 + clusterCommunicator.unicast(clusterMessage, deviceNode);
538 + } catch (IOException e) {
539 + log.warn("Failed to process injected ports of device id: {} " +
540 + "(cluster messaging failed: {})",
541 + deviceId, e);
542 + }
543 + }
544 +
545 + return deviceEvents;
483 } 546 }
484 547
485 private List<DeviceEvent> updatePortsInternal(ProviderId providerId, 548 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
...@@ -1431,4 +1494,48 @@ public class GossipDeviceStore ...@@ -1431,4 +1494,48 @@ public class GossipDeviceStore
1431 }); 1494 });
1432 } 1495 }
1433 } 1496 }
1497 +
1498 + private final class DeviceInjectedEventListener
1499 + implements ClusterMessageHandler {
1500 + @Override
1501 + public void handle(ClusterMessage message) {
1502 +
1503 + log.debug("Received injected device event from peer: {}", message.sender());
1504 + DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1505 +
1506 + ProviderId providerId = event.providerId();
1507 + DeviceId deviceId = event.deviceId();
1508 + DeviceDescription deviceDescription = event.deviceDescription();
1509 +
1510 + executor.submit(new Runnable() {
1511 +
1512 + @Override
1513 + public void run() {
1514 + createOrUpdateDevice(providerId, deviceId, deviceDescription);
1515 + }
1516 + });
1517 + }
1518 + }
1519 +
1520 + private final class PortInjectedEventListener
1521 + implements ClusterMessageHandler {
1522 + @Override
1523 + public void handle(ClusterMessage message) {
1524 +
1525 + log.debug("Received injected port event from peer: {}", message.sender());
1526 + PortInjectedEvent event = SERIALIZER.decode(message.payload());
1527 +
1528 + ProviderId providerId = event.providerId();
1529 + DeviceId deviceId = event.deviceId();
1530 + List<PortDescription> portDescriptions = event.portDescriptions();
1531 +
1532 + executor.submit(new Runnable() {
1533 +
1534 + @Override
1535 + public void run() {
1536 + updatePorts(providerId, deviceId, portDescriptions);
1537 + }
1538 + });
1539 + }
1540 + }
1434 } 1541 }
......
...@@ -34,4 +34,8 @@ public final class GossipDeviceStoreMessageSubjects { ...@@ -34,4 +34,8 @@ public final class GossipDeviceStoreMessageSubjects {
34 public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements"); 34 public static final MessageSubject DEVICE_ADVERTISE = new MessageSubject("peer-device-advertisements");
35 // to be used with 3-way anti-entropy process 35 // to be used with 3-way anti-entropy process
36 public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request"); 36 public static final MessageSubject DEVICE_REQUEST = new MessageSubject("peer-device-request");
37 +
38 + // Network elements injected (not discovered) by ConfigProvider
39 + public static final MessageSubject DEVICE_INJECTED = new MessageSubject("peer-device-injected");
40 + public static final MessageSubject PORT_INJECTED = new MessageSubject("peer-port-injected");
37 } 41 }
......
1 +package org.onosproject.store.device.impl;
2 +
3 +import com.google.common.base.MoreObjects;
4 +import org.onosproject.net.DeviceId;
5 +import org.onosproject.net.device.PortDescription;
6 +import org.onosproject.net.provider.ProviderId;
7 +
8 +import java.util.List;
9 +
10 +public class PortInjectedEvent {
11 +
12 + private ProviderId providerId;
13 + private DeviceId deviceId;
14 + private List<PortDescription> portDescriptions;
15 +
16 + protected PortInjectedEvent(ProviderId providerId, DeviceId deviceId, List<PortDescription> portDescriptions) {
17 + this.providerId = providerId;
18 + this.deviceId = deviceId;
19 + this.portDescriptions = portDescriptions;
20 + }
21 +
22 + public DeviceId deviceId() {
23 + return deviceId;
24 + }
25 +
26 + public ProviderId providerId() {
27 + return providerId;
28 + }
29 +
30 + public List<PortDescription> portDescriptions() {
31 + return portDescriptions;
32 + }
33 +
34 + @Override
35 + public String toString() {
36 + return MoreObjects.toStringHelper(getClass())
37 + .add("providerId", providerId)
38 + .add("deviceId", deviceId)
39 + .add("portDescriptions", portDescriptions)
40 + .toString();
41 + }
42 +
43 + // for serializer
44 + protected PortInjectedEvent() {
45 + this.providerId = null;
46 + this.deviceId = null;
47 + this.portDescriptions = null;
48 + }
49 +
50 +}
...@@ -32,6 +32,7 @@ import org.onlab.util.KryoNamespace; ...@@ -32,6 +32,7 @@ import org.onlab.util.KryoNamespace;
32 import org.onosproject.cluster.ClusterService; 32 import org.onosproject.cluster.ClusterService;
33 import org.onosproject.cluster.ControllerNode; 33 import org.onosproject.cluster.ControllerNode;
34 import org.onosproject.cluster.NodeId; 34 import org.onosproject.cluster.NodeId;
35 +import org.onosproject.mastership.MastershipService;
35 import org.onosproject.net.AnnotationKeys; 36 import org.onosproject.net.AnnotationKeys;
36 import org.onosproject.net.AnnotationsUtil; 37 import org.onosproject.net.AnnotationsUtil;
37 import org.onosproject.net.ConnectPoint; 38 import org.onosproject.net.ConnectPoint;
...@@ -90,9 +91,7 @@ import static org.onosproject.net.Link.State.INACTIVE; ...@@ -90,9 +91,7 @@ import static org.onosproject.net.Link.State.INACTIVE;
90 import static org.onosproject.net.Link.Type.DIRECT; 91 import static org.onosproject.net.Link.Type.DIRECT;
91 import static org.onosproject.net.Link.Type.INDIRECT; 92 import static org.onosproject.net.Link.Type.INDIRECT;
92 import static org.onosproject.net.LinkKey.linkKey; 93 import static org.onosproject.net.LinkKey.linkKey;
93 -import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED; 94 +import static org.onosproject.net.link.LinkEvent.Type.*;
94 -import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
95 -import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
96 import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT; 95 import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
97 import static org.slf4j.LoggerFactory.getLogger; 96 import static org.slf4j.LoggerFactory.getLogger;
98 97
...@@ -106,6 +105,9 @@ public class GossipLinkStore ...@@ -106,6 +105,9 @@ public class GossipLinkStore
106 extends AbstractStore<LinkEvent, LinkStoreDelegate> 105 extends AbstractStore<LinkEvent, LinkStoreDelegate>
107 implements LinkStore { 106 implements LinkStore {
108 107
108 + // Timeout in milliseconds to process links on remote master node
109 + private static final int REMOTE_MASTER_TIMEOUT = 1000;
110 +
109 private final Logger log = getLogger(getClass()); 111 private final Logger log = getLogger(getClass());
110 112
111 // Link inventory 113 // Link inventory
...@@ -131,6 +133,9 @@ public class GossipLinkStore ...@@ -131,6 +133,9 @@ public class GossipLinkStore
131 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 protected ClusterService clusterService; 134 protected ClusterService clusterService;
133 135
136 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137 + protected MastershipService mastershipService;
138 +
134 protected static final KryoSerializer SERIALIZER = new KryoSerializer() { 139 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
135 @Override 140 @Override
136 protected void setupKryoPool() { 141 protected void setupKryoPool() {
...@@ -141,6 +146,7 @@ public class GossipLinkStore ...@@ -141,6 +146,7 @@ public class GossipLinkStore
141 .register(InternalLinkRemovedEvent.class) 146 .register(InternalLinkRemovedEvent.class)
142 .register(LinkAntiEntropyAdvertisement.class) 147 .register(LinkAntiEntropyAdvertisement.class)
143 .register(LinkFragmentId.class) 148 .register(LinkFragmentId.class)
149 + .register(LinkInjectedEvent.class)
144 .build(); 150 .build();
145 } 151 }
146 }; 152 };
...@@ -161,6 +167,9 @@ public class GossipLinkStore ...@@ -161,6 +167,9 @@ public class GossipLinkStore
161 clusterCommunicator.addSubscriber( 167 clusterCommunicator.addSubscriber(
162 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, 168 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
163 new InternalLinkAntiEntropyAdvertisementListener()); 169 new InternalLinkAntiEntropyAdvertisementListener());
170 + clusterCommunicator.addSubscriber(
171 + GossipLinkStoreMessageSubjects.LINK_INJECTED,
172 + new LinkInjectedEventListener());
164 173
165 executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d")); 174 executor = Executors.newCachedThreadPool(namedThreads("onos-link-fg-%d"));
166 175
...@@ -270,27 +279,52 @@ public class GossipLinkStore ...@@ -270,27 +279,52 @@ public class GossipLinkStore
270 public LinkEvent createOrUpdateLink(ProviderId providerId, 279 public LinkEvent createOrUpdateLink(ProviderId providerId,
271 LinkDescription linkDescription) { 280 LinkDescription linkDescription) {
272 281
273 - DeviceId dstDeviceId = linkDescription.dst().deviceId(); 282 + final DeviceId dstDeviceId = linkDescription.dst().deviceId();
283 + final NodeId localNode = clusterService.getLocalNode().id();
284 + final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
285 +
286 + // Process link update only if we're the master of the destination node,
287 + // otherwise signal the actual master.
288 + LinkEvent linkEvent = null;
289 + if (localNode.equals(dstNode)) {
290 +
274 Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId); 291 Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
275 292
276 final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp); 293 final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
277 294
278 LinkKey key = linkKey(linkDescription.src(), linkDescription.dst()); 295 LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
279 - final LinkEvent event;
280 final Timestamped<LinkDescription> mergedDesc; 296 final Timestamped<LinkDescription> mergedDesc;
281 Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key); 297 Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
298 +
282 synchronized (map) { 299 synchronized (map) {
283 - event = createOrUpdateLinkInternal(providerId, deltaDesc); 300 + linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
284 mergedDesc = map.get(providerId); 301 mergedDesc = map.get(providerId);
285 } 302 }
286 303
287 - if (event != null) { 304 + if (linkEvent != null) {
288 log.info("Notifying peers of a link update topology event from providerId: " 305 log.info("Notifying peers of a link update topology event from providerId: "
289 + "{} between src: {} and dst: {}", 306 + "{} between src: {} and dst: {}",
290 providerId, linkDescription.src(), linkDescription.dst()); 307 providerId, linkDescription.src(), linkDescription.dst());
291 notifyPeers(new InternalLinkEvent(providerId, mergedDesc)); 308 notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
292 } 309 }
293 - return event; 310 +
311 + } else {
312 +
313 + LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
314 + ClusterMessage linkInjectedMessage = new ClusterMessage(localNode,
315 + GossipLinkStoreMessageSubjects.LINK_INJECTED, SERIALIZER.encode(linkInjectedEvent));
316 +
317 + try {
318 + clusterCommunicator.unicast(linkInjectedMessage, dstNode);
319 + } catch (IOException e) {
320 + log.warn("Failed to process link update between src: {} and dst: {} " +
321 + "(cluster messaging failed: {})",
322 + linkDescription.src(), linkDescription.dst(), e);
323 + }
324 +
325 + }
326 +
327 + return linkEvent;
294 } 328 }
295 329
296 @Override 330 @Override
...@@ -397,7 +431,7 @@ public class GossipLinkStore ...@@ -397,7 +431,7 @@ public class GossipLinkStore
397 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) { 431 !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
398 432
399 links.put(key, newLink); 433 links.put(key, newLink);
400 - // strictly speaking following can be ommitted 434 + // strictly speaking following can be omitted
401 srcLinks.put(oldLink.src().deviceId(), key); 435 srcLinks.put(oldLink.src().deviceId(), key);
402 dstLinks.put(oldLink.dst().deviceId(), key); 436 dstLinks.put(oldLink.dst().deviceId(), key);
403 return new LinkEvent(LINK_UPDATED, newLink); 437 return new LinkEvent(LINK_UPDATED, newLink);
...@@ -848,4 +882,25 @@ public class GossipLinkStore ...@@ -848,4 +882,25 @@ public class GossipLinkStore
848 }); 882 });
849 } 883 }
850 } 884 }
885 +
886 + private final class LinkInjectedEventListener
887 + implements ClusterMessageHandler {
888 + @Override
889 + public void handle(ClusterMessage message) {
890 +
891 + log.trace("Received injected link event from peer: {}", message.sender());
892 + LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
893 +
894 + ProviderId providerId = linkInjectedEvent.providerId();
895 + LinkDescription linkDescription = linkInjectedEvent.linkDescription();
896 +
897 + executor.submit(new Runnable() {
898 +
899 + @Override
900 + public void run() {
901 + createOrUpdateLink(providerId, linkDescription);
902 + }
903 + });
904 + }
905 + }
851 } 906 }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.link.impl; 16 package org.onosproject.store.link.impl;
17 17
18 -import org.onosproject.store.cluster.messaging.MessageSubject; 18 + import org.onosproject.store.cluster.messaging.MessageSubject;
19 19
20 /** 20 /**
21 * MessageSubjects used by GossipLinkStore peer-peer communication. 21 * MessageSubjects used by GossipLinkStore peer-peer communication.
...@@ -30,4 +30,6 @@ public final class GossipLinkStoreMessageSubjects { ...@@ -30,4 +30,6 @@ public final class GossipLinkStoreMessageSubjects {
30 new MessageSubject("peer-link-removed"); 30 new MessageSubject("peer-link-removed");
31 public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT = 31 public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
32 new MessageSubject("link-enti-entropy-advertisement"); 32 new MessageSubject("link-enti-entropy-advertisement");
33 + public static final MessageSubject LINK_INJECTED =
34 + new MessageSubject("peer-link-injected");
33 } 35 }
......
1 +package org.onosproject.store.link.impl;
2 +
3 +import com.google.common.base.MoreObjects;
4 +import org.onosproject.net.link.LinkDescription;
5 +import org.onosproject.net.provider.ProviderId;
6 +
7 +public class LinkInjectedEvent {
8 +
9 + ProviderId providerId;
10 + LinkDescription linkDescription;
11 +
12 + public LinkInjectedEvent(ProviderId providerId, LinkDescription linkDescription) {
13 + this.providerId = providerId;
14 + this.linkDescription = linkDescription;
15 + }
16 +
17 + public ProviderId providerId() {
18 + return providerId;
19 + }
20 +
21 + public LinkDescription linkDescription() {
22 + return linkDescription;
23 + }
24 +
25 + @Override
26 + public String toString() {
27 + return MoreObjects.toStringHelper(getClass())
28 + .add("providerId", providerId)
29 + .add("linkDescription", linkDescription)
30 + .toString();
31 + }
32 +
33 + // for serializer
34 + protected LinkInjectedEvent() {
35 + this.providerId = null;
36 + this.linkDescription = null;
37 + }
38 +}
...@@ -27,6 +27,8 @@ import org.onlab.packet.IpAddress; ...@@ -27,6 +27,8 @@ import org.onlab.packet.IpAddress;
27 import org.onosproject.cluster.ControllerNode; 27 import org.onosproject.cluster.ControllerNode;
28 import org.onosproject.cluster.DefaultControllerNode; 28 import org.onosproject.cluster.DefaultControllerNode;
29 import org.onosproject.cluster.NodeId; 29 import org.onosproject.cluster.NodeId;
30 +import org.onosproject.mastership.MastershipService;
31 +import org.onosproject.mastership.MastershipServiceAdapter;
30 import org.onosproject.mastership.MastershipTerm; 32 import org.onosproject.mastership.MastershipTerm;
31 import org.onosproject.net.ConnectPoint; 33 import org.onosproject.net.ConnectPoint;
32 import org.onosproject.net.DefaultAnnotations; 34 import org.onosproject.net.DefaultAnnotations;
...@@ -115,7 +117,7 @@ public class GossipLinkStoreTest { ...@@ -115,7 +117,7 @@ public class GossipLinkStoreTest {
115 private DeviceClockManager deviceClockManager; 117 private DeviceClockManager deviceClockManager;
116 private DeviceClockService deviceClockService; 118 private DeviceClockService deviceClockService;
117 private ClusterCommunicationService clusterCommunicator; 119 private ClusterCommunicationService clusterCommunicator;
118 - 120 + private MastershipService mastershipService;
119 121
120 @BeforeClass 122 @BeforeClass
121 public static void setUpBeforeClass() throws Exception { 123 public static void setUpBeforeClass() throws Exception {
...@@ -146,11 +148,13 @@ public class GossipLinkStoreTest { ...@@ -146,11 +148,13 @@ public class GossipLinkStoreTest {
146 linkStoreImpl.deviceClockService = deviceClockService; 148 linkStoreImpl.deviceClockService = deviceClockService;
147 linkStoreImpl.clusterCommunicator = clusterCommunicator; 149 linkStoreImpl.clusterCommunicator = clusterCommunicator;
148 linkStoreImpl.clusterService = new TestClusterService(); 150 linkStoreImpl.clusterService = new TestClusterService();
151 + linkStoreImpl.mastershipService = new TestMastershipService();
149 linkStoreImpl.activate(); 152 linkStoreImpl.activate();
150 linkStore = linkStoreImpl; 153 linkStore = linkStoreImpl;
151 154
152 verify(clusterCommunicator); 155 verify(clusterCommunicator);
153 reset(clusterCommunicator); 156 reset(clusterCommunicator);
157 +
154 } 158 }
155 159
156 @After 160 @After
...@@ -602,4 +606,11 @@ public class GossipLinkStoreTest { ...@@ -602,4 +606,11 @@ public class GossipLinkStoreTest {
602 nodeStates.put(NID2, ACTIVE); 606 nodeStates.put(NID2, ACTIVE);
603 } 607 }
604 } 608 }
609 +
610 + private final class TestMastershipService extends MastershipServiceAdapter {
611 + @Override
612 + public NodeId getMasterFor(DeviceId deviceId) {
613 + return NID1;
614 + }
615 + }
605 } 616 }
......