Madan Jampani

Remove usage of deprecated ClusterCommunicationService API

Change-Id: I56deac9f5b6977096a680a6eb7198d09aeb4f924
...@@ -37,8 +37,6 @@ import org.onosproject.net.flow.instructions.Instruction; ...@@ -37,8 +37,6 @@ import org.onosproject.net.flow.instructions.Instruction;
37 import org.onosproject.net.flow.instructions.Instructions; 37 import org.onosproject.net.flow.instructions.Instructions;
38 import org.onosproject.net.statistic.StatisticStore; 38 import org.onosproject.net.statistic.StatisticStore;
39 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 39 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40 -import org.onosproject.store.cluster.messaging.ClusterMessage;
41 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
42 import org.onosproject.store.serializers.KryoNamespaces; 40 import org.onosproject.store.serializers.KryoNamespaces;
43 import org.onosproject.store.serializers.KryoSerializer; 41 import org.onosproject.store.serializers.KryoSerializer;
44 import org.slf4j.Logger; 42 import org.slf4j.Logger;
...@@ -112,23 +110,18 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -112,23 +110,18 @@ public class DistributedStatisticStore implements StatisticStore {
112 MESSAGE_HANDLER_THREAD_POOL_SIZE, 110 MESSAGE_HANDLER_THREAD_POOL_SIZE,
113 groupedThreads("onos/store/statistic", "message-handlers")); 111 groupedThreads("onos/store/statistic", "message-handlers"));
114 112
115 - clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() { 113 + clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
114 + SERIALIZER::decode,
115 + this::getCurrentStatisticInternal,
116 + SERIALIZER::encode,
117 + messageHandlingExecutor);
116 118
117 - @Override 119 + clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
118 - public void handle(ClusterMessage message) { 120 + SERIALIZER::decode,
119 - ConnectPoint cp = SERIALIZER.decode(message.payload()); 121 + this::getPreviousStatisticInternal,
120 - message.respond(SERIALIZER.encode(getCurrentStatisticInternal(cp))); 122 + SERIALIZER::encode,
121 - } 123 + messageHandlingExecutor);
122 - }, messageHandlingExecutor);
123 -
124 - clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
125 124
126 - @Override
127 - public void handle(ClusterMessage message) {
128 - ConnectPoint cp = SERIALIZER.decode(message.payload());
129 - message.respond(SERIALIZER.encode(getPreviousStatisticInternal(cp)));
130 - }
131 - }, messageHandlingExecutor);
132 log.info("Started"); 125 log.info("Started");
133 } 126 }
134 127
......