Madan Jampani

Added support for replicating device removed topology events

...@@ -120,6 +120,7 @@ public class GossipDeviceStore ...@@ -120,6 +120,7 @@ public class GossipDeviceStore
120 .register(KryoPoolUtil.API) 120 .register(KryoPoolUtil.API)
121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer()) 121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
122 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer()) 122 .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
123 + .register(InternalDeviceRemovedEvent.class)
123 .register(InternalPortEvent.class, new InternalPortEventSerializer()) 124 .register(InternalPortEvent.class, new InternalPortEventSerializer())
124 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer()) 125 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
125 .register(Timestamp.class) 126 .register(Timestamp.class)
...@@ -138,6 +139,8 @@ public class GossipDeviceStore ...@@ -138,6 +139,8 @@ public class GossipDeviceStore
138 clusterCommunicator.addSubscriber( 139 clusterCommunicator.addSubscriber(
139 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener()); 140 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
140 clusterCommunicator.addSubscriber( 141 clusterCommunicator.addSubscriber(
142 + GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
143 + clusterCommunicator.addSubscriber(
141 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener()); 144 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
142 clusterCommunicator.addSubscriber( 145 clusterCommunicator.addSubscriber(
143 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener()); 146 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
...@@ -583,7 +586,16 @@ public class GossipDeviceStore ...@@ -583,7 +586,16 @@ public class GossipDeviceStore
583 public synchronized DeviceEvent removeDevice(DeviceId deviceId) { 586 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
584 Timestamp timestamp = clockService.getTimestamp(deviceId); 587 Timestamp timestamp = clockService.getTimestamp(deviceId);
585 DeviceEvent event = removeDeviceInternal(deviceId, timestamp); 588 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
586 - // TODO: broadcast removal event 589 + if (event != null) {
590 + log.info("Notifying peers of a device removed topology event for deviceId: {}",
591 + deviceId);
592 + try {
593 + notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
594 + } catch (IOException e) {
595 + log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
596 + deviceId);
597 + }
598 + }
587 return event; 599 return event;
588 } 600 }
589 601
...@@ -834,6 +846,14 @@ public class GossipDeviceStore ...@@ -834,6 +846,14 @@ public class GossipDeviceStore
834 clusterCommunicator.broadcast(message); 846 clusterCommunicator.broadcast(message);
835 } 847 }
836 848
849 + private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
850 + ClusterMessage message = new ClusterMessage(
851 + clusterService.getLocalNode().id(),
852 + GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
853 + SERIALIZER.encode(event));
854 + clusterCommunicator.broadcast(message);
855 + }
856 +
837 private void notifyPeers(InternalPortEvent event) throws IOException { 857 private void notifyPeers(InternalPortEvent event) throws IOException {
838 ClusterMessage message = new ClusterMessage( 858 ClusterMessage message = new ClusterMessage(
839 clusterService.getLocalNode().id(), 859 clusterService.getLocalNode().id(),
...@@ -879,6 +899,20 @@ public class GossipDeviceStore ...@@ -879,6 +899,20 @@ public class GossipDeviceStore
879 } 899 }
880 } 900 }
881 901
902 + private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
903 + @Override
904 + public void handle(ClusterMessage message) {
905 +
906 + log.info("Received device removed event from peer: {}", message.sender());
907 + InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
908 +
909 + DeviceId deviceId = event.deviceId();
910 + Timestamp timestamp = event.timestamp();
911 +
912 + removeDeviceInternal(deviceId, timestamp);
913 + }
914 + }
915 +
882 private class InternalPortEventListener implements ClusterMessageHandler { 916 private class InternalPortEventListener implements ClusterMessageHandler {
883 @Override 917 @Override
884 public void handle(ClusterMessage message) { 918 public void handle(ClusterMessage message) {
......
...@@ -11,6 +11,7 @@ public final class GossipDeviceStoreMessageSubjects { ...@@ -11,6 +11,7 @@ public final class GossipDeviceStoreMessageSubjects {
11 11
12 public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update"); 12 public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
13 public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline"); 13 public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
14 + public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
14 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); 15 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
15 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); 16 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
16 } 17 }
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import org.onlab.onos.net.DeviceId;
4 +import org.onlab.onos.store.Timestamp;
5 +
6 +/**
7 + * Information published by GossipDeviceStore to notify peers of a device
8 + * being administratively removed.
9 + */
10 +public class InternalDeviceRemovedEvent {
11 +
12 + private final DeviceId deviceId;
13 + private final Timestamp timestamp;
14 +
15 + /**
16 + * Creates a InternalDeviceRemovedEvent.
17 + * @param deviceId identifier of the removed device.
18 + * @param timestamp timestamp of when the device was administratively removed.
19 + */
20 + public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) {
21 + this.deviceId = deviceId;
22 + this.timestamp = timestamp;
23 + }
24 +
25 + public DeviceId deviceId() {
26 + return deviceId;
27 + }
28 +
29 + public Timestamp timestamp() {
30 + return timestamp;
31 + }
32 +
33 + // for serializer
34 + @SuppressWarnings("unused")
35 + private InternalDeviceRemovedEvent() {
36 + deviceId = null;
37 + timestamp = null;
38 + }
39 +}