Yuta HIGUCHI

DistributedFlowRuleStore related fixes

- handle no master for Device case
- Changed failed item type to FlowRule

Change-Id: If6c85751759cf6ba9ab0ed0384cbe1bf08a5d572
...@@ -19,13 +19,13 @@ import java.util.Set; ...@@ -19,13 +19,13 @@ import java.util.Set;
19 19
20 import com.google.common.collect.ImmutableSet; 20 import com.google.common.collect.ImmutableSet;
21 21
22 -public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> { 22 +public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
23 23
24 24
25 private final boolean success; 25 private final boolean success;
26 - private final Set<FlowEntry> failures; 26 + private final Set<FlowRule> failures;
27 27
28 - public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) { 28 + public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
29 this.success = success; 29 this.success = success;
30 this.failures = ImmutableSet.copyOf(failures); 30 this.failures = ImmutableSet.copyOf(failures);
31 } 31 }
...@@ -36,7 +36,7 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> ...@@ -36,7 +36,7 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowEntry>
36 } 36 }
37 37
38 @Override 38 @Override
39 - public Set<FlowEntry> failedItems() { 39 + public Set<FlowRule> failedItems() {
40 return failures; 40 return failures;
41 } 41 }
42 42
......
...@@ -395,7 +395,7 @@ public class FlowRuleManager ...@@ -395,7 +395,7 @@ public class FlowRuleManager
395 395
396 break; 396 break;
397 case BATCH_OPERATION_COMPLETED: 397 case BATCH_OPERATION_COMPLETED:
398 - Set<FlowEntry> failedItems = event.result().failedItems(); 398 + Set<FlowRule> failedItems = event.result().failedItems();
399 for (FlowEntry entry : request.toAdd()) { 399 for (FlowEntry entry : request.toAdd()) {
400 if (!failedItems.contains(entry)) { 400 if (!failedItems.contains(entry)) {
401 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry)); 401 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
...@@ -463,7 +463,7 @@ public class FlowRuleManager ...@@ -463,7 +463,7 @@ public class FlowRuleManager
463 } 463 }
464 464
465 boolean success = true; 465 boolean success = true;
466 - Set<FlowEntry> failed = Sets.newHashSet(); 466 + Set<FlowRule> failed = Sets.newHashSet();
467 CompletedBatchOperation completed; 467 CompletedBatchOperation completed;
468 for (Future<CompletedBatchOperation> future : futures) { 468 for (Future<CompletedBatchOperation> future : futures) {
469 completed = future.get(); 469 completed = future.get();
...@@ -483,7 +483,7 @@ public class FlowRuleManager ...@@ -483,7 +483,7 @@ public class FlowRuleManager
483 return overall; 483 return overall;
484 } 484 }
485 boolean success = true; 485 boolean success = true;
486 - Set<FlowEntry> failed = Sets.newHashSet(); 486 + Set<FlowRule> failed = Sets.newHashSet();
487 CompletedBatchOperation completed; 487 CompletedBatchOperation completed;
488 long start = System.nanoTime(); 488 long start = System.nanoTime();
489 long end = start + unit.toNanos(timeout); 489 long end = start + unit.toNanos(timeout);
...@@ -497,7 +497,7 @@ public class FlowRuleManager ...@@ -497,7 +497,7 @@ public class FlowRuleManager
497 return finalizeBatchOperation(success, failed); 497 return finalizeBatchOperation(success, failed);
498 } 498 }
499 499
500 - private boolean validateBatchOperation(Set<FlowEntry> failed, 500 + private boolean validateBatchOperation(Set<FlowRule> failed,
501 CompletedBatchOperation completed) { 501 CompletedBatchOperation completed) {
502 502
503 if (isCancelled()) { 503 if (isCancelled()) {
...@@ -519,7 +519,7 @@ public class FlowRuleManager ...@@ -519,7 +519,7 @@ public class FlowRuleManager
519 } 519 }
520 520
521 private CompletedBatchOperation finalizeBatchOperation(boolean success, 521 private CompletedBatchOperation finalizeBatchOperation(boolean success,
522 - Set<FlowEntry> failed) { 522 + Set<FlowRule> failed) {
523 synchronized (this) { 523 synchronized (this) {
524 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) { 524 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
525 if (state.get() == BatchState.FINISHED) { 525 if (state.get() == BatchState.FINISHED) {
......
...@@ -555,7 +555,7 @@ public class FlowRuleManagerTest { ...@@ -555,7 +555,7 @@ public class FlowRuleManagerTest {
555 @Override 555 @Override
556 public CompletedBatchOperation get() 556 public CompletedBatchOperation get()
557 throws InterruptedException, ExecutionException { 557 throws InterruptedException, ExecutionException {
558 - return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()); 558 + return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
559 } 559 }
560 560
561 @Override 561 @Override
......
...@@ -26,6 +26,7 @@ import java.util.ArrayList; ...@@ -26,6 +26,7 @@ import java.util.ArrayList;
26 import java.util.Arrays; 26 import java.util.Arrays;
27 import java.util.Collection; 27 import java.util.Collection;
28 import java.util.Collections; 28 import java.util.Collections;
29 +import java.util.HashSet;
29 import java.util.Map.Entry; 30 import java.util.Map.Entry;
30 import java.util.Set; 31 import java.util.Set;
31 import java.util.concurrent.ExecutionException; 32 import java.util.concurrent.ExecutionException;
...@@ -172,12 +173,36 @@ public class DistributedFlowRuleStore ...@@ -172,12 +173,36 @@ public class DistributedFlowRuleStore
172 .softValues() 173 .softValues()
173 .build(new SMapLoader()); 174 .build(new SMapLoader());
174 175
176 + final NodeId local = clusterService.getLocalNode().id();
177 +
175 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() { 178 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
176 179
177 @Override 180 @Override
178 public void handle(final ClusterMessage message) { 181 public void handle(final ClusterMessage message) {
179 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload()); 182 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
180 log.info("received batch request {}", operation); 183 log.info("received batch request {}", operation);
184 +
185 + final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
186 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
187 + if (!local.equals(replicaInfo.master().orNull())) {
188 +
189 + Set<FlowRule> failures = new HashSet<>(operation.size());
190 + for (FlowRuleBatchEntry op : operation.getOperations()) {
191 + failures.add(op.getTarget());
192 + }
193 + CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
194 + // This node is no longer the master, respond as all failed.
195 + // TODO: we might want to wrap response in envelope
196 + // to distinguish sw programming failure and hand over
197 + // it make sense in the latter case to retry immediately.
198 + try {
199 + message.respond(SERIALIZER.encode(allFailed));
200 + } catch (IOException e) {
201 + log.error("Failed to respond back", e);
202 + }
203 + return;
204 + }
205 +
181 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation); 206 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
182 207
183 f.addListener(new Runnable() { 208 f.addListener(new Runnable() {
...@@ -256,6 +281,14 @@ public class DistributedFlowRuleStore ...@@ -256,6 +281,14 @@ public class DistributedFlowRuleStore
256 @Override 281 @Override
257 public synchronized FlowEntry getFlowEntry(FlowRule rule) { 282 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
258 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 283 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
284 +
285 + if (!replicaInfo.master().isPresent()) {
286 + log.warn("No master for {}", rule);
287 + // TODO: revisit if this should be returning null.
288 + // FIXME: throw a FlowStoreException
289 + throw new RuntimeException("No master for " + rule);
290 + }
291 +
259 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 292 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
260 return getFlowEntryInternal(rule); 293 return getFlowEntryInternal(rule);
261 } 294 }
...@@ -290,6 +323,14 @@ public class DistributedFlowRuleStore ...@@ -290,6 +323,14 @@ public class DistributedFlowRuleStore
290 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) { 323 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
291 324
292 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId); 325 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
326 +
327 + if (!replicaInfo.master().isPresent()) {
328 + log.warn("No master for {}", deviceId);
329 + // TODO: revisit if this should be returning empty collection.
330 + // FIXME: throw a FlowStoreException
331 + throw new RuntimeException("No master for " + deviceId);
332 + }
333 +
293 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 334 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
294 return getFlowEntriesInternal(deviceId); 335 return getFlowEntriesInternal(deviceId);
295 } 336 }
...@@ -329,14 +370,22 @@ public class DistributedFlowRuleStore ...@@ -329,14 +370,22 @@ public class DistributedFlowRuleStore
329 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) { 370 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
330 371
331 if (operation.getOperations().isEmpty()) { 372 if (operation.getOperations().isEmpty()) {
332 - return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); 373 + return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
333 } 374 }
334 375
335 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId(); 376 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
336 377
337 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId); 378 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
338 379
339 - if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 380 + if (!replicaInfo.master().isPresent()) {
381 + log.warn("No master for {}", deviceId);
382 + // TODO: revisit if this should be "success" from Future point of view
383 + // with every FlowEntry failed
384 + return Futures.immediateFailedFuture(new IOException("No master to forward to"));
385 + }
386 +
387 + final NodeId local = clusterService.getLocalNode().id();
388 + if (replicaInfo.master().get().equals(local)) {
340 return storeBatchInternal(operation); 389 return storeBatchInternal(operation);
341 } 390 }
342 391
...@@ -344,7 +393,7 @@ public class DistributedFlowRuleStore ...@@ -344,7 +393,7 @@ public class DistributedFlowRuleStore
344 replicaInfo.master().orNull(), deviceId); 393 replicaInfo.master().orNull(), deviceId);
345 394
346 ClusterMessage message = new ClusterMessage( 395 ClusterMessage message = new ClusterMessage(
347 - clusterService.getLocalNode().id(), 396 + local,
348 APPLY_BATCH_FLOWS, 397 APPLY_BATCH_FLOWS,
349 SERIALIZER.encode(operation)); 398 SERIALIZER.encode(operation));
350 399
...@@ -367,7 +416,6 @@ public class DistributedFlowRuleStore ...@@ -367,7 +416,6 @@ public class DistributedFlowRuleStore
367 final List<StoredFlowEntry> toAdd = new ArrayList<>(); 416 final List<StoredFlowEntry> toAdd = new ArrayList<>();
368 DeviceId did = null; 417 DeviceId did = null;
369 418
370 -
371 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) { 419 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
372 FlowRule flowRule = batchEntry.getTarget(); 420 FlowRule flowRule = batchEntry.getTarget();
373 FlowRuleOperation op = batchEntry.getOperator(); 421 FlowRuleOperation op = batchEntry.getOperator();
...@@ -390,7 +438,7 @@ public class DistributedFlowRuleStore ...@@ -390,7 +438,7 @@ public class DistributedFlowRuleStore
390 } 438 }
391 } 439 }
392 if (toAdd.isEmpty() && toRemove.isEmpty()) { 440 if (toAdd.isEmpty() && toRemove.isEmpty()) {
393 - return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); 441 + return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
394 } 442 }
395 443
396 // create remote backup copies 444 // create remote backup copies
...@@ -434,7 +482,8 @@ public class DistributedFlowRuleStore ...@@ -434,7 +482,8 @@ public class DistributedFlowRuleStore
434 @Override 482 @Override
435 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) { 483 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
436 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 484 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
437 - if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 485 + final NodeId localId = clusterService.getLocalNode().id();
486 + if (localId.equals(replicaInfo.master().orNull())) {
438 return addOrUpdateFlowRuleInternal(rule); 487 return addOrUpdateFlowRuleInternal(rule);
439 } 488 }
440 489
...@@ -471,7 +520,9 @@ public class DistributedFlowRuleStore ...@@ -471,7 +520,9 @@ public class DistributedFlowRuleStore
471 @Override 520 @Override
472 public FlowRuleEvent removeFlowRule(FlowEntry rule) { 521 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
473 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 522 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
474 - if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 523 +
524 + final NodeId localId = clusterService.getLocalNode().id();
525 + if (localId.equals(replicaInfo.master().orNull())) {
475 // bypass and handle it locally 526 // bypass and handle it locally
476 return removeFlowRuleInternal(rule); 527 return removeFlowRuleInternal(rule);
477 } 528 }
......
...@@ -28,6 +28,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality; ...@@ -28,6 +28,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
28 import org.apache.felix.scr.annotations.Service; 28 import org.apache.felix.scr.annotations.Service;
29 import org.onlab.onos.cluster.ClusterService; 29 import org.onlab.onos.cluster.ClusterService;
30 import org.onlab.onos.net.ConnectPoint; 30 import org.onlab.onos.net.ConnectPoint;
31 +import org.onlab.onos.net.DeviceId;
31 import org.onlab.onos.net.PortNumber; 32 import org.onlab.onos.net.PortNumber;
32 import org.onlab.onos.net.flow.FlowEntry; 33 import org.onlab.onos.net.flow.FlowEntry;
33 import org.onlab.onos.net.flow.FlowRule; 34 import org.onlab.onos.net.flow.FlowRule;
...@@ -191,7 +192,14 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -191,7 +192,14 @@ public class DistributedStatisticStore implements StatisticStore {
191 192
192 @Override 193 @Override
193 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) { 194 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
194 - ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId()); 195 + final DeviceId deviceId = connectPoint.deviceId();
196 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
197 + if (!replicaInfo.master().isPresent()) {
198 + log.warn("No master for {}", deviceId);
199 + // TODO: revisit if this should be returning empty collection.
200 + // FIXME: throw a StatsStoreException
201 + throw new RuntimeException("No master for " + deviceId);
202 + }
195 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 203 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
196 return getCurrentStatisticInternal(connectPoint); 204 return getCurrentStatisticInternal(connectPoint);
197 } else { 205 } else {
...@@ -219,7 +227,14 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -219,7 +227,14 @@ public class DistributedStatisticStore implements StatisticStore {
219 227
220 @Override 228 @Override
221 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) { 229 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
222 - ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(connectPoint.deviceId()); 230 + final DeviceId deviceId = connectPoint.deviceId();
231 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
232 + if (!replicaInfo.master().isPresent()) {
233 + log.warn("No master for {}", deviceId);
234 + // TODO: revisit if this should be returning empty collection.
235 + // FIXME: throw a StatsStoreException
236 + throw new RuntimeException("No master for " + deviceId);
237 + }
223 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 238 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
224 return getPreviousStatisticInternal(connectPoint); 239 return getPreviousStatisticInternal(connectPoint);
225 } else { 240 } else {
......