Yuta HIGUCHI

FlowRule subsystem bugfixes

- RULE_ADDED will be posted when the Flow was confirmed by stats,
  even if they were installed as a batch
- Properly handle batch in Simple store

Change-Id: I0a0e15b29ff9c0d56d5a646e0751511d73c8f552
...@@ -27,11 +27,14 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T ...@@ -27,11 +27,14 @@ public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.T
27 */ 27 */
28 public enum Type { 28 public enum Type {
29 29
30 + // Request has been forwarded to MASTER Node
30 /** 31 /**
31 * Signifies that a batch operation has been initiated. 32 * Signifies that a batch operation has been initiated.
32 */ 33 */
33 BATCH_OPERATION_REQUESTED, 34 BATCH_OPERATION_REQUESTED,
34 35
36 + // MASTER Node has pushed the batch down to the Device
37 + // (e.g., Received barrier reply)
35 /** 38 /**
36 * Signifies that a batch operation has completed. 39 * Signifies that a batch operation has completed.
37 */ 40 */
......
...@@ -25,29 +25,29 @@ import com.google.common.collect.Lists; ...@@ -25,29 +25,29 @@ import com.google.common.collect.Lists;
25 public class FlowRuleBatchRequest { 25 public class FlowRuleBatchRequest {
26 26
27 private final int batchId; 27 private final int batchId;
28 - private final List<FlowEntry> toAdd; 28 + private final List<FlowRule> toAdd;
29 - private final List<FlowEntry> toRemove; 29 + private final List<FlowRule> toRemove;
30 30
31 - public FlowRuleBatchRequest(int batchId, List<? extends FlowEntry> toAdd, List<? extends FlowEntry> toRemove) { 31 + public FlowRuleBatchRequest(int batchId, List<? extends FlowRule> toAdd, List<? extends FlowRule> toRemove) {
32 this.batchId = batchId; 32 this.batchId = batchId;
33 this.toAdd = Collections.unmodifiableList(toAdd); 33 this.toAdd = Collections.unmodifiableList(toAdd);
34 this.toRemove = Collections.unmodifiableList(toRemove); 34 this.toRemove = Collections.unmodifiableList(toRemove);
35 } 35 }
36 36
37 - public List<FlowEntry> toAdd() { 37 + public List<FlowRule> toAdd() {
38 return toAdd; 38 return toAdd;
39 } 39 }
40 40
41 - public List<FlowEntry> toRemove() { 41 + public List<FlowRule> toRemove() {
42 return toRemove; 42 return toRemove;
43 } 43 }
44 44
45 public FlowRuleBatchOperation asBatchOperation() { 45 public FlowRuleBatchOperation asBatchOperation() {
46 List<FlowRuleBatchEntry> entries = Lists.newArrayList(); 46 List<FlowRuleBatchEntry> entries = Lists.newArrayList();
47 - for (FlowEntry e : toAdd) { 47 + for (FlowRule e : toAdd) {
48 entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e)); 48 entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
49 } 49 }
50 - for (FlowEntry e : toRemove) { 50 + for (FlowRule e : toRemove) {
51 entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e)); 51 entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
52 } 52 }
53 return new FlowRuleBatchOperation(entries); 53 return new FlowRuleBatchOperation(entries);
......
...@@ -371,10 +371,11 @@ public class FlowRuleManager ...@@ -371,10 +371,11 @@ public class FlowRuleManager
371 final FlowRuleBatchRequest request = event.subject(); 371 final FlowRuleBatchRequest request = event.subject();
372 switch (event.type()) { 372 switch (event.type()) {
373 case BATCH_OPERATION_REQUESTED: 373 case BATCH_OPERATION_REQUESTED:
374 - for (FlowEntry entry : request.toAdd()) { 374 + // Request has been forwarded to MASTER Node, and was
375 + for (FlowRule entry : request.toAdd()) {
375 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry)); 376 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
376 } 377 }
377 - for (FlowEntry entry : request.toRemove()) { 378 + for (FlowRule entry : request.toRemove()) {
378 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry)); 379 eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
379 } 380 }
380 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ? 381 // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
...@@ -392,21 +393,15 @@ public class FlowRuleManager ...@@ -392,21 +393,15 @@ public class FlowRuleManager
392 Futures.getUnchecked(result))); 393 Futures.getUnchecked(result)));
393 } 394 }
394 }, futureListeners); 395 }, futureListeners);
395 -
396 break; 396 break;
397 +
397 case BATCH_OPERATION_COMPLETED: 398 case BATCH_OPERATION_COMPLETED:
398 - Set<FlowRule> failedItems = event.result().failedItems(); 399 + // MASTER Node has pushed the batch down to the Device
399 - for (FlowEntry entry : request.toAdd()) { 400 +
400 - if (!failedItems.contains(entry)) { 401 + // Note: RULE_ADDED will be posted
401 - eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry)); 402 + // when Flow was actually confirmed by stats reply.
402 - }
403 - }
404 - for (FlowEntry entry : request.toRemove()) {
405 - if (!failedItems.contains(entry)) {
406 - eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
407 - }
408 - }
409 break; 403 break;
404 +
410 default: 405 default:
411 break; 406 break;
412 } 407 }
......
...@@ -148,7 +148,7 @@ public class FlowRuleManagerTest { ...@@ -148,7 +148,7 @@ public class FlowRuleManagerTest {
148 int i = 0; 148 int i = 0;
149 System.err.println("events :" + listener.events); 149 System.err.println("events :" + listener.events);
150 for (FlowRuleEvent e : listener.events) { 150 for (FlowRuleEvent e : listener.events) {
151 - assertTrue("unexpected event", e.type().equals(events[i])); 151 + assertEquals("unexpected event", events[i], e.type());
152 i++; 152 i++;
153 } 153 }
154 154
...@@ -178,15 +178,13 @@ public class FlowRuleManagerTest { ...@@ -178,15 +178,13 @@ public class FlowRuleManagerTest {
178 RULE_ADDED, RULE_ADDED); 178 RULE_ADDED, RULE_ADDED);
179 179
180 addFlowRule(1); 180 addFlowRule(1);
181 + System.err.println("events :" + listener.events);
181 assertEquals("should still be 2 rules", 2, flowCount()); 182 assertEquals("should still be 2 rules", 2, flowCount());
182 183
183 providerService.pushFlowMetrics(DID, ImmutableList.of(fe1)); 184 providerService.pushFlowMetrics(DID, ImmutableList.of(fe1));
184 validateEvents(RULE_UPDATED); 185 validateEvents(RULE_UPDATED);
185 } 186 }
186 187
187 -
188 - // TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
189 - //backing store is sensitive to the order of additions/removals
190 private boolean validateState(Map<FlowRule, FlowEntryState> expected) { 188 private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
191 Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected); 189 Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
192 Iterable<FlowEntry> rules = service.getFlowEntries(DID); 190 Iterable<FlowEntry> rules = service.getFlowEntries(DID);
...@@ -539,17 +537,17 @@ public class FlowRuleManagerTest { ...@@ -539,17 +537,17 @@ public class FlowRuleManagerTest {
539 537
540 @Override 538 @Override
541 public boolean cancel(boolean mayInterruptIfRunning) { 539 public boolean cancel(boolean mayInterruptIfRunning) {
542 - return true; 540 + return false;
543 } 541 }
544 542
545 @Override 543 @Override
546 public boolean isCancelled() { 544 public boolean isCancelled() {
547 - return true; 545 + return false;
548 } 546 }
549 547
550 @Override 548 @Override
551 public boolean isDone() { 549 public boolean isDone() {
552 - return false; 550 + return true;
553 } 551 }
554 552
555 @Override 553 @Override
...@@ -562,12 +560,14 @@ public class FlowRuleManagerTest { ...@@ -562,12 +560,14 @@ public class FlowRuleManagerTest {
562 public CompletedBatchOperation get(long timeout, TimeUnit unit) 560 public CompletedBatchOperation get(long timeout, TimeUnit unit)
563 throws InterruptedException, 561 throws InterruptedException,
564 ExecutionException, TimeoutException { 562 ExecutionException, TimeoutException {
565 - return null; 563 + return new CompletedBatchOperation(true, Collections.<FlowRule>emptySet());
566 } 564 }
567 565
568 @Override 566 @Override
569 public void addListener(Runnable task, Executor executor) { 567 public void addListener(Runnable task, Executor executor) {
570 - // TODO: add stuff. 568 + if (isDone()) {
569 + executor.execute(task);
570 + }
571 } 571 }
572 } 572 }
573 573
......
...@@ -16,8 +16,12 @@ ...@@ -16,8 +16,12 @@
16 package org.onlab.onos.store.trivial.impl; 16 package org.onlab.onos.store.trivial.impl;
17 17
18 import com.google.common.base.Function; 18 import com.google.common.base.Function;
19 +import com.google.common.cache.Cache;
20 +import com.google.common.cache.CacheBuilder;
19 import com.google.common.collect.FluentIterable; 21 import com.google.common.collect.FluentIterable;
20 import com.google.common.util.concurrent.Futures; 22 import com.google.common.util.concurrent.Futures;
23 +import com.google.common.util.concurrent.SettableFuture;
24 +
21 import org.apache.felix.scr.annotations.Activate; 25 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 26 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 27 import org.apache.felix.scr.annotations.Deactivate;
...@@ -43,13 +47,15 @@ import org.onlab.onos.store.AbstractStore; ...@@ -43,13 +47,15 @@ import org.onlab.onos.store.AbstractStore;
43 import org.onlab.util.NewConcurrentHashMap; 47 import org.onlab.util.NewConcurrentHashMap;
44 import org.slf4j.Logger; 48 import org.slf4j.Logger;
45 49
46 -import java.util.Arrays; 50 +import java.util.ArrayList;
47 import java.util.Collections; 51 import java.util.Collections;
48 import java.util.List; 52 import java.util.List;
49 import java.util.concurrent.ConcurrentHashMap; 53 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ConcurrentMap; 54 import java.util.concurrent.ConcurrentMap;
51 import java.util.concurrent.CopyOnWriteArrayList; 55 import java.util.concurrent.CopyOnWriteArrayList;
52 import java.util.concurrent.Future; 56 import java.util.concurrent.Future;
57 +import java.util.concurrent.TimeUnit;
58 +import java.util.concurrent.atomic.AtomicInteger;
53 59
54 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; 60 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
55 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 61 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
...@@ -72,6 +78,18 @@ public class SimpleFlowRuleStore ...@@ -72,6 +78,18 @@ public class SimpleFlowRuleStore
72 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>> 78 private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, List<StoredFlowEntry>>>
73 flowEntries = new ConcurrentHashMap<>(); 79 flowEntries = new ConcurrentHashMap<>();
74 80
81 + private final AtomicInteger localBatchIdGen = new AtomicInteger();
82 +
83 + // TODO: make this configurable
84 + private int pendingFutureTimeoutMinutes = 5;
85 +
86 + private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
87 + CacheBuilder.newBuilder()
88 + .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
89 + // TODO Explicitly fail the future if expired?
90 + //.removalListener(listener)
91 + .build();
92 +
75 @Activate 93 @Activate
76 public void activate() { 94 public void activate() {
77 log.info("Started"); 95 log.info("Started");
...@@ -173,10 +191,6 @@ public class SimpleFlowRuleStore ...@@ -173,10 +191,6 @@ public class SimpleFlowRuleStore
173 } 191 }
174 // new flow rule added 192 // new flow rule added
175 existing.add(f); 193 existing.add(f);
176 - notifyDelegate(FlowRuleBatchEvent.requested(
177 - new FlowRuleBatchRequest(1, /* FIXME generate something */
178 - Arrays.<FlowEntry>asList(f),
179 - Collections.<FlowEntry>emptyList())));
180 } 194 }
181 } 195 }
182 196
...@@ -190,11 +204,6 @@ public class SimpleFlowRuleStore ...@@ -190,11 +204,6 @@ public class SimpleFlowRuleStore
190 if (entry.equals(rule)) { 204 if (entry.equals(rule)) {
191 synchronized (entry) { 205 synchronized (entry) {
192 entry.setState(FlowEntryState.PENDING_REMOVE); 206 entry.setState(FlowEntryState.PENDING_REMOVE);
193 - // TODO: Should we notify only if it's "remote" event?
194 - notifyDelegate(FlowRuleBatchEvent.requested(
195 - new FlowRuleBatchRequest(1, /* FIXME generate something */
196 - Collections.<FlowEntry>emptyList(),
197 - Arrays.<FlowEntry>asList(entry))));
198 } 207 }
199 } 208 }
200 } 209 }
...@@ -251,20 +260,47 @@ public class SimpleFlowRuleStore ...@@ -251,20 +260,47 @@ public class SimpleFlowRuleStore
251 @Override 260 @Override
252 public Future<CompletedBatchOperation> storeBatch( 261 public Future<CompletedBatchOperation> storeBatch(
253 FlowRuleBatchOperation batchOperation) { 262 FlowRuleBatchOperation batchOperation) {
263 + List<FlowRule> toAdd = new ArrayList<>();
264 + List<FlowRule> toRemove = new ArrayList<>();
254 for (FlowRuleBatchEntry entry : batchOperation.getOperations()) { 265 for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
266 + final FlowRule flowRule = entry.getTarget();
255 if (entry.getOperator().equals(FlowRuleOperation.ADD)) { 267 if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
256 - storeFlowRule(entry.getTarget()); 268 + if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
269 + storeFlowRule(flowRule);
270 + toAdd.add(flowRule);
271 + }
257 } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) { 272 } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
258 - deleteFlowRule(entry.getTarget()); 273 + if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
274 + deleteFlowRule(flowRule);
275 + toRemove.add(flowRule);
276 + }
259 } else { 277 } else {
260 throw new UnsupportedOperationException("Unsupported operation type"); 278 throw new UnsupportedOperationException("Unsupported operation type");
261 } 279 }
262 } 280 }
263 - return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet())); 281 +
282 + if (toAdd.isEmpty() && toRemove.isEmpty()) {
283 + return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
284 + }
285 +
286 + SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
287 + final int batchId = localBatchIdGen.incrementAndGet();
288 +
289 + pendingFutures.put(batchId, r);
290 + notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
291 +
292 + return r;
264 } 293 }
265 294
266 @Override 295 @Override
267 public void batchOperationComplete(FlowRuleBatchEvent event) { 296 public void batchOperationComplete(FlowRuleBatchEvent event) {
297 + final Integer batchId = event.subject().batchId();
298 + SettableFuture<CompletedBatchOperation> future
299 + = pendingFutures.getIfPresent(batchId);
300 + if (future != null) {
301 + future.set(event.result());
302 + pendingFutures.invalidate(batchId);
303 + }
268 notifyDelegate(event); 304 notifyDelegate(event);
269 } 305 }
270 } 306 }
......