Sho SHIMIZU
Committed by Gerrit Code Review

Reduce use of deprecated ClusterCommunicationService.addSubscriber()

Change-Id: I2e78e63340473b0334a1537f8049753a9f400849
...@@ -54,8 +54,6 @@ import org.onosproject.net.intent.Key; ...@@ -54,8 +54,6 @@ import org.onosproject.net.intent.Key;
54 import org.onosproject.net.intent.PartitionService; 54 import org.onosproject.net.intent.PartitionService;
55 import org.onosproject.net.intent.PointToPointIntent; 55 import org.onosproject.net.intent.PointToPointIntent;
56 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 56 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
57 -import org.onosproject.store.cluster.messaging.ClusterMessage;
58 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
59 import org.onosproject.store.cluster.messaging.MessageSubject; 57 import org.onosproject.store.cluster.messaging.MessageSubject;
60 import org.osgi.service.component.ComponentContext; 58 import org.osgi.service.component.ComponentContext;
61 import org.slf4j.Logger; 59 import org.slf4j.Logger;
...@@ -71,6 +69,7 @@ import java.util.TimerTask; ...@@ -71,6 +69,7 @@ import java.util.TimerTask;
71 import java.util.concurrent.ExecutorService; 69 import java.util.concurrent.ExecutorService;
72 import java.util.concurrent.Executors; 70 import java.util.concurrent.Executors;
73 import java.util.concurrent.TimeUnit; 71 import java.util.concurrent.TimeUnit;
72 +import java.util.function.Consumer;
74 import java.util.stream.Collectors; 73 import java.util.stream.Collectors;
75 74
76 import static com.google.common.base.Preconditions.checkState; 75 import static com.google.common.base.Preconditions.checkState;
...@@ -186,7 +185,7 @@ public class IntentPerfInstaller { ...@@ -186,7 +185,7 @@ public class IntentPerfInstaller {
186 messageHandlingExecutor = Executors.newSingleThreadExecutor( 185 messageHandlingExecutor = Executors.newSingleThreadExecutor(
187 groupedThreads("onos/perf", "command-handler")); 186 groupedThreads("onos/perf", "command-handler"));
188 187
189 - communicationService.addSubscriber(CONTROL, new InternalControl(), 188 + communicationService.addSubscriber(CONTROL, String::new, new InternalControl(),
190 messageHandlingExecutor); 189 messageHandlingExecutor);
191 190
192 listener = new Listener(); 191 listener = new Listener();
...@@ -572,10 +571,9 @@ public class IntentPerfInstaller { ...@@ -572,10 +571,9 @@ public class IntentPerfInstaller {
572 } 571 }
573 } 572 }
574 573
575 - private class InternalControl implements ClusterMessageHandler { 574 + private class InternalControl implements Consumer<String> {
576 @Override 575 @Override
577 - public void handle(ClusterMessage message) { 576 + public void accept(String cmd) {
578 - String cmd = new String(message.payload());
579 log.info("Received command {}", cmd); 577 log.info("Received command {}", cmd);
580 if (cmd.equals(START)) { 578 if (cmd.equals(START)) {
581 startTestRun(); 579 startTestRun();
......
1 /* 1 /*
2 - * Copyright 2015 Open Networking Laboratory 2 + * Copyright 2015-2016 Open Networking Laboratory
3 * 3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License. 5 * you may not use this file except in compliance with the License.
...@@ -55,8 +55,6 @@ import org.onosproject.net.DeviceId; ...@@ -55,8 +55,6 @@ import org.onosproject.net.DeviceId;
55 import org.onosproject.net.device.DeviceService; 55 import org.onosproject.net.device.DeviceService;
56 import org.onosproject.store.AbstractStore; 56 import org.onosproject.store.AbstractStore;
57 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 57 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
58 -import org.onosproject.store.cluster.messaging.ClusterMessage;
59 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
60 import org.onosproject.store.serializers.KryoNamespaces; 58 import org.onosproject.store.serializers.KryoNamespaces;
61 import org.onosproject.store.service.ConsistentMap; 59 import org.onosproject.store.service.ConsistentMap;
62 import org.onosproject.store.service.Serializer; 60 import org.onosproject.store.service.Serializer;
...@@ -126,62 +124,42 @@ public class DistributedLabelResourceStore ...@@ -126,62 +124,42 @@ public class DistributedLabelResourceStore
126 "message-handlers")); 124 "message-handlers"));
127 clusterCommunicator 125 clusterCommunicator
128 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED, 126 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_CREATED,
129 - new ClusterMessageHandler() { 127 + SERIALIZER::<LabelResourcePool>decode,
130 - 128 + operation -> {
131 - @Override 129 + log.trace("received get flow entry request for {}", operation);
132 - public void handle(ClusterMessage message) { 130 + return internalCreate(operation);
133 - LabelResourcePool operation = SERIALIZER 131 + },
134 - .decode(message.payload()); 132 + SERIALIZER::<Boolean>encode,
135 - log.trace("received get flow entry request for {}", 133 + messageHandlingExecutor);
136 - operation);
137 - boolean b = internalCreate(operation);
138 - message.respond(SERIALIZER.encode(b));
139 - }
140 - }, messageHandlingExecutor);
141 clusterCommunicator 134 clusterCommunicator
142 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED, 135 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_DESTROYED,
143 - new ClusterMessageHandler() { 136 + SERIALIZER::<DeviceId>decode,
144 - 137 + deviceId -> {
145 - @Override 138 + log.trace("received get flow entry request for {}", deviceId);
146 - public void handle(ClusterMessage message) { 139 + return internalDestroy(deviceId);
147 - DeviceId deviceId = SERIALIZER 140 + },
148 - .decode(message.payload()); 141 + SERIALIZER::<Boolean>encode,
149 - log.trace("received get flow entry request for {}", 142 + messageHandlingExecutor);
150 - deviceId);
151 - boolean b = internalDestroy(deviceId);
152 - message.respond(SERIALIZER.encode(b));
153 - }
154 - }, messageHandlingExecutor);
155 clusterCommunicator 143 clusterCommunicator
156 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY, 144 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_APPLY,
157 - new ClusterMessageHandler() { 145 + SERIALIZER::<LabelResourceRequest>decode,
146 + request -> {
147 + log.trace("received get flow entry request for {}", request);
148 + return internalApply(request);
158 149
159 - @Override 150 + },
160 - public void handle(ClusterMessage message) { 151 + SERIALIZER::<Collection<LabelResource>>encode,
161 - LabelResourceRequest request = SERIALIZER 152 + messageHandlingExecutor);
162 - .decode(message.payload());
163 - log.trace("received get flow entry request for {}",
164 - request);
165 - final Collection<LabelResource> resource = internalApply(request);
166 - message.respond(SERIALIZER
167 - .encode(resource));
168 - }
169 - }, messageHandlingExecutor);
170 clusterCommunicator 153 clusterCommunicator
171 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE, 154 .addSubscriber(LabelResourceMessageSubjects.LABEL_POOL_RELEASE,
172 - new ClusterMessageHandler() { 155 + SERIALIZER::<LabelResourceRequest>decode,
173 - 156 + request -> {
174 - @Override
175 - public void handle(ClusterMessage message) {
176 - LabelResourceRequest request = SERIALIZER
177 - .decode(message.payload());
178 log.trace("received get flow entry request for {}", 157 log.trace("received get flow entry request for {}",
179 request); 158 request);
180 - final boolean isSuccess = internalRelease(request); 159 + return internalRelease(request);
181 - message.respond(SERIALIZER 160 + },
182 - .encode(isSuccess)); 161 + SERIALIZER::<Boolean>encode,
183 - } 162 + messageHandlingExecutor);
184 - }, messageHandlingExecutor);
185 log.info("Started"); 163 log.info("Started");
186 } 164 }
187 165
......