Madan Jampani

GossipHostStore: Added listeners for handling peer messages

...@@ -30,6 +30,7 @@ import org.onlab.onos.store.AbstractStore; ...@@ -30,6 +30,7 @@ import org.onlab.onos.store.AbstractStore;
30 import org.onlab.onos.store.Timestamp; 30 import org.onlab.onos.store.Timestamp;
31 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 31 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 32 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
33 +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
33 import org.onlab.onos.store.cluster.messaging.MessageSubject; 34 import org.onlab.onos.store.cluster.messaging.MessageSubject;
34 import org.onlab.onos.store.common.impl.Timestamped; 35 import org.onlab.onos.store.common.impl.Timestamped;
35 import org.onlab.onos.store.serializers.DistributedStoreSerializers; 36 import org.onlab.onos.store.serializers.DistributedStoreSerializers;
...@@ -95,6 +96,11 @@ public class GossipHostStore ...@@ -95,6 +96,11 @@ public class GossipHostStore
95 96
96 @Activate 97 @Activate
97 public void activate() { 98 public void activate() {
99 + clusterCommunicator.addSubscriber(
100 + GossipHostStoreMessageSubjects.HOST_UPDATED, new InternalHostEventListener());
101 + clusterCommunicator.addSubscriber(
102 + GossipHostStoreMessageSubjects.HOST_REMOVED, new InternalHostRemovedEventListener());
103 +
98 log.info("Started"); 104 log.info("Started");
99 } 105 }
100 106
...@@ -392,4 +398,40 @@ public class GossipHostStore ...@@ -392,4 +398,40 @@ public class GossipHostStore
392 SERIALIZER.encode(event)); 398 SERIALIZER.encode(event));
393 clusterCommunicator.broadcast(message); 399 clusterCommunicator.broadcast(message);
394 } 400 }
401 +
402 + private void notifyDelegateIfNotNull(HostEvent event) {
403 + if (event != null) {
404 + notifyDelegate(event);
405 + }
406 + }
407 +
408 + private class InternalHostEventListener implements ClusterMessageHandler {
409 + @Override
410 + public void handle(ClusterMessage message) {
411 +
412 + log.info("Received host update event from peer: {}", message.sender());
413 + InternalHostEvent event = (InternalHostEvent) SERIALIZER.decode(message.payload());
414 +
415 + ProviderId providerId = event.providerId();
416 + HostId hostId = event.hostId();
417 + HostDescription hostDescription = event.hostDescription();
418 + Timestamp timestamp = event.timestamp();
419 +
420 + notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, hostId, hostDescription, timestamp));
421 + }
422 + }
423 +
424 + private class InternalHostRemovedEventListener implements ClusterMessageHandler {
425 + @Override
426 + public void handle(ClusterMessage message) {
427 +
428 + log.info("Received host removed event from peer: {}", message.sender());
429 + InternalHostRemovedEvent event = (InternalHostRemovedEvent) SERIALIZER.decode(message.payload());
430 +
431 + HostId hostId = event.hostId();
432 + Timestamp timestamp = event.timestamp();
433 +
434 + notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
435 + }
436 + }
395 } 437 }
......