tom

Merge remote-tracking branch 'origin/master'

...@@ -119,8 +119,10 @@ public class GossipDeviceStore ...@@ -119,8 +119,10 @@ public class GossipDeviceStore
119 serializerPool = KryoPool.newBuilder() 119 serializerPool = KryoPool.newBuilder()
120 .register(KryoPoolUtil.API) 120 .register(KryoPoolUtil.API)
121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer()) 121 .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
122 + .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
122 .register(InternalPortEvent.class, new InternalPortEventSerializer()) 123 .register(InternalPortEvent.class, new InternalPortEventSerializer())
123 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer()) 124 .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
125 + .register(Timestamp.class)
124 .register(Timestamped.class) 126 .register(Timestamped.class)
125 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer()) 127 .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
126 .build() 128 .build()
...@@ -134,6 +136,8 @@ public class GossipDeviceStore ...@@ -134,6 +136,8 @@ public class GossipDeviceStore
134 clusterCommunicator.addSubscriber( 136 clusterCommunicator.addSubscriber(
135 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener()); 137 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
136 clusterCommunicator.addSubscriber( 138 clusterCommunicator.addSubscriber(
139 + GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
140 + clusterCommunicator.addSubscriber(
137 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener()); 141 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
138 clusterCommunicator.addSubscriber( 142 clusterCommunicator.addSubscriber(
139 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener()); 143 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
...@@ -177,7 +181,7 @@ public class GossipDeviceStore ...@@ -177,7 +181,7 @@ public class GossipDeviceStore
177 try { 181 try {
178 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc)); 182 notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
179 } catch (IOException e) { 183 } catch (IOException e) {
180 - log.error("Failed to notify peers of a device update topology event or providerId: " 184 + log.error("Failed to notify peers of a device update topology event for providerId: "
181 + providerId + " and deviceId: " + deviceId, e); 185 + providerId + " and deviceId: " + deviceId, e);
182 } 186 }
183 } 187 }
...@@ -280,7 +284,18 @@ public class GossipDeviceStore ...@@ -280,7 +284,18 @@ public class GossipDeviceStore
280 @Override 284 @Override
281 public DeviceEvent markOffline(DeviceId deviceId) { 285 public DeviceEvent markOffline(DeviceId deviceId) {
282 Timestamp timestamp = clockService.getTimestamp(deviceId); 286 Timestamp timestamp = clockService.getTimestamp(deviceId);
283 - return markOfflineInternal(deviceId, timestamp); 287 + DeviceEvent event = markOfflineInternal(deviceId, timestamp);
288 + if (event != null) {
289 + log.info("Notifying peers of a device offline topology event for deviceId: {}",
290 + deviceId);
291 + try {
292 + notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
293 + } catch (IOException e) {
294 + log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
295 + deviceId);
296 + }
297 + }
298 + return event;
284 } 299 }
285 300
286 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { 301 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
...@@ -811,6 +826,14 @@ public class GossipDeviceStore ...@@ -811,6 +826,14 @@ public class GossipDeviceStore
811 clusterCommunicator.broadcast(message); 826 clusterCommunicator.broadcast(message);
812 } 827 }
813 828
829 + private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
830 + ClusterMessage message = new ClusterMessage(
831 + clusterService.getLocalNode().id(),
832 + GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
833 + SERIALIZER.encode(event));
834 + clusterCommunicator.broadcast(message);
835 + }
836 +
814 private void notifyPeers(InternalPortEvent event) throws IOException { 837 private void notifyPeers(InternalPortEvent event) throws IOException {
815 ClusterMessage message = new ClusterMessage( 838 ClusterMessage message = new ClusterMessage(
816 clusterService.getLocalNode().id(), 839 clusterService.getLocalNode().id(),
...@@ -830,15 +853,32 @@ public class GossipDeviceStore ...@@ -830,15 +853,32 @@ public class GossipDeviceStore
830 private class InternalDeviceEventListener implements ClusterMessageHandler { 853 private class InternalDeviceEventListener implements ClusterMessageHandler {
831 @Override 854 @Override
832 public void handle(ClusterMessage message) { 855 public void handle(ClusterMessage message) {
856 +
833 log.info("Received device update event from peer: {}", message.sender()); 857 log.info("Received device update event from peer: {}", message.sender());
834 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload()); 858 InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
859 +
835 ProviderId providerId = event.providerId(); 860 ProviderId providerId = event.providerId();
836 DeviceId deviceId = event.deviceId(); 861 DeviceId deviceId = event.deviceId();
837 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); 862 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
863 +
838 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription); 864 createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
839 } 865 }
840 } 866 }
841 867
868 + private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
869 + @Override
870 + public void handle(ClusterMessage message) {
871 +
872 + log.info("Received device offline event from peer: {}", message.sender());
873 + InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
874 +
875 + DeviceId deviceId = event.deviceId();
876 + Timestamp timestamp = event.timestamp();
877 +
878 + markOfflineInternal(deviceId, timestamp);
879 + }
880 + }
881 +
842 private class InternalPortEventListener implements ClusterMessageHandler { 882 private class InternalPortEventListener implements ClusterMessageHandler {
843 @Override 883 @Override
844 public void handle(ClusterMessage message) { 884 public void handle(ClusterMessage message) {
......
...@@ -3,13 +3,14 @@ package org.onlab.onos.store.device.impl; ...@@ -3,13 +3,14 @@ package org.onlab.onos.store.device.impl;
3 import org.onlab.onos.store.cluster.messaging.MessageSubject; 3 import org.onlab.onos.store.cluster.messaging.MessageSubject;
4 4
5 /** 5 /**
6 - * MessageSubjects used by GossipDeviceStore. 6 + * MessageSubjects used by GossipDeviceStore peer-peer communication.
7 */ 7 */
8 public final class GossipDeviceStoreMessageSubjects { 8 public final class GossipDeviceStoreMessageSubjects {
9 9
10 private GossipDeviceStoreMessageSubjects() {} 10 private GossipDeviceStoreMessageSubjects() {}
11 11
12 public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update"); 12 public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
13 + public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
13 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update"); 14 public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
14 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update"); 15 public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
15 } 16 }
......
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import org.onlab.onos.net.DeviceId;
4 +import org.onlab.onos.store.Timestamp;
5 +
6 +/**
7 + * Information published by GossipDeviceStore to notify peers of a device
8 + * going offline.
9 + */
10 +public class InternalDeviceOfflineEvent {
11 +
12 + private final DeviceId deviceId;
13 + private final Timestamp timestamp;
14 +
15 + /**
16 + * Creates a InternalDeviceOfflineEvent.
17 + * @param deviceId identifier of device going offline.
18 + * @param timestamp timestamp of when the device went offline.
19 + */
20 + public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) {
21 + this.deviceId = deviceId;
22 + this.timestamp = timestamp;
23 + }
24 +
25 + public DeviceId deviceId() {
26 + return deviceId;
27 + }
28 +
29 + public Timestamp timestamp() {
30 + return timestamp;
31 + }
32 +
33 + // for serializer
34 + @SuppressWarnings("unused")
35 + private InternalDeviceOfflineEvent() {
36 + deviceId = null;
37 + timestamp = null;
38 + }
39 +}
1 +package org.onlab.onos.store.device.impl;
2 +
3 +import org.onlab.onos.net.DeviceId;
4 +import org.onlab.onos.store.Timestamp;
5 +
6 +import com.esotericsoftware.kryo.Kryo;
7 +import com.esotericsoftware.kryo.Serializer;
8 +import com.esotericsoftware.kryo.io.Input;
9 +import com.esotericsoftware.kryo.io.Output;
10 +
11 +/**
12 + * Kryo Serializer for {@link InternalDeviceOfflineEvent}.
13 + */
14 +public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> {
15 +
16 + /**
17 + * Creates a serializer for {@link InternalDeviceOfflineEvent}.
18 + */
19 + public InternalDeviceOfflineEventSerializer() {
20 + // does not accept null
21 + super(false);
22 + }
23 +
24 + @Override
25 + public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) {
26 + kryo.writeClassAndObject(output, event.deviceId());
27 + kryo.writeClassAndObject(output, event.timestamp());
28 + }
29 +
30 + @Override
31 + public InternalDeviceOfflineEvent read(Kryo kryo, Input input,
32 + Class<InternalDeviceOfflineEvent> type) {
33 + DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
34 + Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input);
35 +
36 + return new InternalDeviceOfflineEvent(deviceId, timestamp);
37 + }
38 +}