Yuta HIGUCHI

convert anonymous class to nested class

Change-Id: I2d0770b80ca4806fabf31fd358ecb165d3e9f778
...@@ -74,12 +74,13 @@ import org.onlab.onos.store.flow.ReplicaInfoEventListener; ...@@ -74,12 +74,13 @@ import org.onlab.onos.store.flow.ReplicaInfoEventListener;
74 import org.onlab.onos.store.flow.ReplicaInfoService; 74 import org.onlab.onos.store.flow.ReplicaInfoService;
75 import org.onlab.onos.store.hz.AbstractHazelcastStore; 75 import org.onlab.onos.store.hz.AbstractHazelcastStore;
76 import org.onlab.onos.store.hz.SMap; 76 import org.onlab.onos.store.hz.SMap;
77 +import org.onlab.onos.store.serializers.DecodeTo;
77 import org.onlab.onos.store.serializers.DistributedStoreSerializers; 78 import org.onlab.onos.store.serializers.DistributedStoreSerializers;
78 import org.onlab.onos.store.serializers.KryoSerializer; 79 import org.onlab.onos.store.serializers.KryoSerializer;
80 +import org.onlab.onos.store.serializers.StoreSerializer;
79 import org.onlab.util.KryoNamespace; 81 import org.onlab.util.KryoNamespace;
80 import org.slf4j.Logger; 82 import org.slf4j.Logger;
81 83
82 -import com.google.common.base.Function;
83 import com.google.common.cache.Cache; 84 import com.google.common.cache.Cache;
84 import com.google.common.cache.CacheBuilder; 85 import com.google.common.cache.CacheBuilder;
85 import com.google.common.cache.CacheLoader; 86 import com.google.common.cache.CacheLoader;
...@@ -146,7 +147,7 @@ public class DistributedFlowRuleStore ...@@ -146,7 +147,7 @@ public class DistributedFlowRuleStore
146 // TODO make this configurable 147 // TODO make this configurable
147 private boolean syncBackup = false; 148 private boolean syncBackup = false;
148 149
149 - protected static final KryoSerializer SERIALIZER = new KryoSerializer() { 150 + protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
150 @Override 151 @Override
151 protected void setupKryoPool() { 152 protected void setupKryoPool() {
152 serializerPool = KryoNamespace.newBuilder() 153 serializerPool = KryoNamespace.newBuilder()
...@@ -175,50 +176,7 @@ public class DistributedFlowRuleStore ...@@ -175,50 +176,7 @@ public class DistributedFlowRuleStore
175 176
176 final NodeId local = clusterService.getLocalNode().id(); 177 final NodeId local = clusterService.getLocalNode().id();
177 178
178 - clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() { 179 + clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
179 -
180 - @Override
181 - public void handle(final ClusterMessage message) {
182 - FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
183 - log.info("received batch request {}", operation);
184 -
185 - final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
186 - ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
187 - if (!local.equals(replicaInfo.master().orNull())) {
188 -
189 - Set<FlowRule> failures = new HashSet<>(operation.size());
190 - for (FlowRuleBatchEntry op : operation.getOperations()) {
191 - failures.add(op.getTarget());
192 - }
193 - CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
194 - // This node is no longer the master, respond as all failed.
195 - // TODO: we might want to wrap response in envelope
196 - // to distinguish sw programming failure and hand over
197 - // it make sense in the latter case to retry immediately.
198 - try {
199 - message.respond(SERIALIZER.encode(allFailed));
200 - } catch (IOException e) {
201 - log.error("Failed to respond back", e);
202 - }
203 - return;
204 - }
205 -
206 - final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
207 -
208 - f.addListener(new Runnable() {
209 -
210 - @Override
211 - public void run() {
212 - CompletedBatchOperation result = Futures.getUnchecked(f);
213 - try {
214 - message.respond(SERIALIZER.encode(result));
215 - } catch (IOException e) {
216 - log.error("Failed to respond back", e);
217 - }
218 - }
219 - }, futureListeners);
220 - }
221 - });
222 180
223 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() { 181 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
224 182
...@@ -400,12 +358,7 @@ public class DistributedFlowRuleStore ...@@ -400,12 +358,7 @@ public class DistributedFlowRuleStore
400 try { 358 try {
401 ListenableFuture<byte[]> responseFuture = 359 ListenableFuture<byte[]> responseFuture =
402 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 360 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
403 - return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() { 361 + return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
404 - @Override
405 - public CompletedBatchOperation apply(byte[] input) {
406 - return SERIALIZER.decode(input);
407 - }
408 - });
409 } catch (IOException e) { 362 } catch (IOException e) {
410 return Futures.immediateFailedFuture(e); 363 return Futures.immediateFailedFuture(e);
411 } 364 }
...@@ -583,6 +536,56 @@ public class DistributedFlowRuleStore ...@@ -583,6 +536,56 @@ public class DistributedFlowRuleStore
583 log.debug("removedFromPrimary {}", removed); 536 log.debug("removedFromPrimary {}", removed);
584 } 537 }
585 538
539 + private final class OnStoreBatch implements ClusterMessageHandler {
540 + private final NodeId local;
541 +
542 + private OnStoreBatch(NodeId local) {
543 + this.local = local;
544 + }
545 +
546 + @Override
547 + public void handle(final ClusterMessage message) {
548 + FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
549 + log.info("received batch request {}", operation);
550 +
551 + final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
552 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
553 + if (!local.equals(replicaInfo.master().orNull())) {
554 +
555 + Set<FlowRule> failures = new HashSet<>(operation.size());
556 + for (FlowRuleBatchEntry op : operation.getOperations()) {
557 + failures.add(op.getTarget());
558 + }
559 + CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
560 + // This node is no longer the master, respond as all failed.
561 + // TODO: we might want to wrap response in envelope
562 + // to distinguish sw programming failure and hand over
563 + // it make sense in the latter case to retry immediately.
564 + try {
565 + message.respond(SERIALIZER.encode(allFailed));
566 + } catch (IOException e) {
567 + log.error("Failed to respond back", e);
568 + }
569 + return;
570 + }
571 +
572 + final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
573 +
574 + f.addListener(new Runnable() {
575 +
576 + @Override
577 + public void run() {
578 + CompletedBatchOperation result = Futures.getUnchecked(f);
579 + try {
580 + message.respond(SERIALIZER.encode(result));
581 + } catch (IOException e) {
582 + log.error("Failed to respond back", e);
583 + }
584 + }
585 + }, futureListeners);
586 + }
587 + }
588 +
586 private final class SMapLoader 589 private final class SMapLoader
587 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> { 590 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
588 591
......
1 +package org.onlab.onos.store.serializers;
2 +
3 +import static com.google.common.base.Preconditions.checkNotNull;
4 +
5 +import com.google.common.base.Function;
6 +
7 +/**
8 + * Function to convert byte[] into {@code T}.
9 + *
10 + * @param <T> Type after decoding
11 + */
12 +public final class DecodeTo<T> implements Function<byte[], T> {
13 +
14 + private StoreSerializer serializer;
15 +
16 + public DecodeTo(StoreSerializer serializer) {
17 + this.serializer = checkNotNull(serializer);
18 + }
19 +
20 + @Override
21 + public T apply(byte[] input) {
22 + return serializer.decode(input);
23 + }
24 +}
...\ No newline at end of file ...\ No newline at end of file