Madan Jampani

Added support for replicating device offline topology events

...@@ -119,8 +119,10 @@ public class GossipDeviceStore ...@@ -119,8 +119,10 @@ public class GossipDeviceStore
119 serializerPool = KryoPool.newBuilder() 119 serializerPool = KryoPool.newBuilder()
120 .register(KryoPoolUtil.API) 120 .register(KryoPoolUtil.API)
121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer()) 121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
122 + .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
122 .register(InternalPortEvent.class, new InternalPortEventSerializer()) 123 .register(InternalPortEvent.class, new InternalPortEventSerializer())
123 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer()) 124 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
125 + .register(Timestamp.class)
124 .register(Timestamped.class) 126 .register(Timestamped.class)
125 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer()) 127 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
126 .build() 128 .build()
...@@ -134,6 +136,8 @@ public class GossipDeviceStore ...@@ -134,6 +136,8 @@ public class GossipDeviceStore
134 clusterCommunicator.addSubscriber( 136 clusterCommunicator.addSubscriber(
135 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener()); 137 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
136 clusterCommunicator.addSubscriber( 138 clusterCommunicator.addSubscriber(
139 + GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
140 + clusterCommunicator.addSubscriber(
137 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener()); 141 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
138 clusterCommunicator.addSubscriber( 142 clusterCommunicator.addSubscriber(
139 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener()); 143 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
...@@ -177,7 +181,7 @@ public class GossipDeviceStore ...@@ -177,7 +181,7 @@ public class GossipDeviceStore
177 try { 181 try {
178 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc)); 182 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
179 } catch (IOException e) { 183 } catch (IOException e) {
180 - log.error("Failed to notify peers of a device update topology event or providerId: " 184 + log.error("Failed to notify peers of a device update topology event for providerId: "
181 + providerId + " and deviceId: " + deviceId, e); 185 + providerId + " and deviceId: " + deviceId, e);
182 } 186 }
183 } 187 }
...@@ -280,7 +284,18 @@ public class GossipDeviceStore ...@@ -280,7 +284,18 @@ public class GossipDeviceStore
280 @Override 284 @Override
281 public DeviceEvent markOffline(DeviceId deviceId) { 285 public DeviceEvent markOffline(DeviceId deviceId) {
282 Timestamp timestamp = clockService.getTimestamp(deviceId); 286 Timestamp timestamp = clockService.getTimestamp(deviceId);
283 - return markOfflineInternal(deviceId, timestamp); 287 + DeviceEvent event = markOfflineInternal(deviceId, timestamp);
288 + if (event != null) {
289 + log.info("Notifying peers of a device offline topology event for deviceId: {}",
290 + deviceId);
291 + try {
292 + notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
293 + } catch (IOException e) {
294 + log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
295 + deviceId);
296 + }
297 + }
298 + return event;
284 } 299 }
285 300
286 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { 301 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
...@@ -811,6 +826,14 @@ public class GossipDeviceStore ...@@ -811,6 +826,14 @@ public class GossipDeviceStore
811 clusterCommunicator.broadcast(message); 826 clusterCommunicator.broadcast(message);
812 } 827 }
813 828
829 + private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
830 + ClusterMessage message = new ClusterMessage(
831 + clusterService.getLocalNode().id(),
832 + GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
833 + SERIALIZER.encode(event));
834 + clusterCommunicator.broadcast(message);
835 + }
836 +
814 private void notifyPeers(InternalPortEvent event) throws IOException { 837 private void notifyPeers(InternalPortEvent event) throws IOException {
815 ClusterMessage message = new ClusterMessage( 838 ClusterMessage message = new ClusterMessage(
816 clusterService.getLocalNode().id(), 839 clusterService.getLocalNode().id(),
...@@ -830,15 +853,32 @@ public class GossipDeviceStore ...@@ -830,15 +853,32 @@ public class GossipDeviceStore
830 private class InternalDeviceEventListener implements ClusterMessageHandler { 853 private class InternalDeviceEventListener implements ClusterMessageHandler {
831 @Override 854 @Override
832 public void handle(ClusterMessage message) { 855 public void handle(ClusterMessage message) {
856 +
833 log.info("Received device update event from peer: {}", message.sender()); 857 log.info("Received device update event from peer: {}", message.sender());
834 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload()); 858 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
859 +
835 ProviderId providerId = event.providerId(); 860 ProviderId providerId = event.providerId();
836 DeviceId deviceId = event.deviceId(); 861 DeviceId deviceId = event.deviceId();
837 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); 862 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
863 +
838 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription); 864 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
839 } 865 }
840 } 866 }
841 867
868 + private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
869 + @Override
870 + public void handle(ClusterMessage message) {
871 +
872 + log.info("Received device offline event from peer: {}", message.sender());
873 + InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
874 +
875 + DeviceId deviceId = event.deviceId();
876 + Timestamp timestamp = event.timestamp();
877 +
878 + markOfflineInternal(deviceId, timestamp);
879 + }
880 + }
881 +
842 private class InternalPortEventListener implements ClusterMessageHandler { 882 private class InternalPortEventListener implements ClusterMessageHandler {
843 @Override 883 @Override
844 public void handle(ClusterMessage message) { 884 public void handle(ClusterMessage message) {
......
...@@ -3,13 +3,14 @@ package org.onlab.onos.store.device.impl; ...@@ -3,13 +3,14 @@ package org.onlab.onos.store.device.impl;
3 import org.onlab.onos.store.cluster.messaging.MessageSubject; 3 import org.onlab.onos.store.cluster.messaging.MessageSubject;
4 4
5 /** 5 /**
6 - * MessageSubjects used by GossipDeviceStore. 6 + * MessageSubjects used by GossipDeviceStore peer-peer communication.
7 */ 7 */
8 public final class GossipDeviceStoreMessageSubjects { 8 public final class GossipDeviceStoreMessageSubjects {
9 9
10 private GossipDeviceStoreMessageSubjects() {} 10 private GossipDeviceStoreMessageSubjects() {}
11 11
12 public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update"); 12 public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
13 + public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
13 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); 14 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
14 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); 15 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
15 } 16 }
......