Marc De Leenheer

Merge branch 'master' into optical-integration

Showing 30 changed files with 488 additions and 220 deletions
...@@ -14,6 +14,16 @@ ...@@ -14,6 +14,16 @@
14 "attachmentDpid" : "00:00:00:00:00:00:00:a2", 14 "attachmentDpid" : "00:00:00:00:00:00:00:a2",
15 "attachmentPort" : "1", 15 "attachmentPort" : "1",
16 "ipAddress" : "192.168.30.1" 16 "ipAddress" : "192.168.30.1"
17 + },
18 + {
19 + "attachmentDpid" : "00:00:00:00:00:00:00:a6",
20 + "attachmentPort" : "1",
21 + "ipAddress" : "192.168.40.1"
22 + },
23 + {
24 + "attachmentDpid" : "00:00:00:00:00:00:00:a4",
25 + "attachmentPort" : "4",
26 + "ipAddress" : "192.168.60.1"
17 } 27 }
18 ], 28 ],
19 "bgpSpeakers" : [ 29 "bgpSpeakers" : [
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
18 */ 18 */
19 package org.onlab.onos.net.flow; 19 package org.onlab.onos.net.flow;
20 20
21 -import java.util.List; 21 +import java.util.Set;
22 22
23 /** 23 /**
24 * Interface capturing the result of a batch operation. 24 * Interface capturing the result of a batch operation.
...@@ -33,9 +33,9 @@ public interface BatchOperationResult<T> { ...@@ -33,9 +33,9 @@ public interface BatchOperationResult<T> {
33 boolean isSuccess(); 33 boolean isSuccess();
34 34
35 /** 35 /**
36 - * Obtains a list of items which failed. 36 + * Obtains a set of items which failed.
37 - * @return a list of failures 37 + * @return a set of failures
38 */ 38 */
39 - List<T> failedItems(); 39 + Set<T> failedItems();
40 40
41 } 41 }
......
...@@ -18,19 +18,19 @@ ...@@ -18,19 +18,19 @@
18 */ 18 */
19 package org.onlab.onos.net.flow; 19 package org.onlab.onos.net.flow;
20 20
21 -import java.util.List; 21 +import java.util.Set;
22 22
23 -import com.google.common.collect.ImmutableList; 23 +import com.google.common.collect.ImmutableSet;
24 24
25 public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> { 25 public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
26 26
27 27
28 private final boolean success; 28 private final boolean success;
29 - private final List<FlowEntry> failures; 29 + private final Set<FlowEntry> failures;
30 30
31 - public CompletedBatchOperation(boolean success, List<FlowEntry> failures) { 31 + public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) {
32 this.success = success; 32 this.success = success;
33 - this.failures = ImmutableList.copyOf(failures); 33 + this.failures = ImmutableSet.copyOf(failures);
34 } 34 }
35 35
36 @Override 36 @Override
...@@ -39,7 +39,7 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> ...@@ -39,7 +39,7 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowEntry>
39 } 39 }
40 40
41 @Override 41 @Override
42 - public List<FlowEntry> failedItems() { 42 + public Set<FlowEntry> failedItems() {
43 return failures; 43 return failures;
44 } 44 }
45 45
......
1 +package org.onlab.onos.net.flow;
2 +
3 +import org.onlab.onos.event.AbstractEvent;
4 +
5 +/**
6 + * Describes flow rule batch event.
7 + */
8 +public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
9 +
10 + /**
11 + * Type of flow rule events.
12 + */
13 + public enum Type {
14 +
15 + /**
16 + * Signifies that a batch operation has been initiated.
17 + */
18 + BATCH_OPERATION_REQUESTED,
19 +
20 + /**
21 + * Signifies that a batch operation has completed.
22 + */
23 + BATCH_OPERATION_COMPLETED,
24 + }
25 +
26 + private final CompletedBatchOperation result;
27 +
28 + /**
29 + * Constructs a new FlowRuleBatchEvent.
30 + * @param request batch operation request.
31 + * @return event.
32 + */
33 + public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
34 + FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
35 + return event;
36 + }
37 +
38 + /**
39 + * Constructs a new FlowRuleBatchEvent.
40 + * @param request batch operation request.
41 + * @param result completed batch operation result.
42 + * @return event.
43 + */
44 + public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
45 + FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
46 + return event;
47 + }
48 +
49 + /**
50 + * Returns the result of this batch operation.
51 + * @return batch operation result.
52 + */
53 + public CompletedBatchOperation result() {
54 + return result;
55 + }
56 +
57 + /**
58 + * Creates an event of a given type and for the specified flow rule batch.
59 + *
60 + * @param type flow rule batch event type
61 + * @param batch event flow rule batch subject
62 + */
63 + private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
64 + super(type, request);
65 + this.result = result;
66 + }
67 +}
1 +package org.onlab.onos.net.flow;
2 +
3 +import java.util.Collections;
4 +import java.util.List;
5 +
6 +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
7 +
8 +import com.google.common.collect.Lists;
9 +
10 +public class FlowRuleBatchRequest {
11 +
12 + private final List<FlowEntry> toAdd;
13 + private final List<FlowEntry> toRemove;
14 +
15 + public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
16 + this.toAdd = Collections.unmodifiableList(toAdd);
17 + this.toRemove = Collections.unmodifiableList(toRemove);
18 + }
19 +
20 + public List<FlowEntry> toAdd() {
21 + return toAdd;
22 + }
23 +
24 + public List<FlowEntry> toRemove() {
25 + return toRemove;
26 + }
27 +
28 + public FlowRuleBatchOperation asBatchOperation() {
29 + List<FlowRuleBatchEntry> entries = Lists.newArrayList();
30 + for (FlowEntry e : toAdd) {
31 + entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
32 + }
33 + for (FlowEntry e : toRemove) {
34 + entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
35 + }
36 + return new FlowRuleBatchOperation(entries);
37 + }
38 +}
...@@ -18,11 +18,11 @@ ...@@ -18,11 +18,11 @@
18 */ 18 */
19 package org.onlab.onos.net.flow; 19 package org.onlab.onos.net.flow;
20 20
21 -import java.util.concurrent.Future;
22 -
23 import org.onlab.onos.ApplicationId; 21 import org.onlab.onos.ApplicationId;
24 import org.onlab.onos.net.provider.Provider; 22 import org.onlab.onos.net.provider.Provider;
25 23
24 +import com.google.common.util.concurrent.ListenableFuture;
25 +
26 /** 26 /**
27 * Abstraction of a flow rule provider. 27 * Abstraction of a flow rule provider.
28 */ 28 */
...@@ -60,6 +60,6 @@ public interface FlowRuleProvider extends Provider { ...@@ -60,6 +60,6 @@ public interface FlowRuleProvider extends Provider {
60 * @param batch a batch of flow rules 60 * @param batch a batch of flow rules
61 * @return a future indicating the status of this execution 61 * @return a future indicating the status of this execution
62 */ 62 */
63 - Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch); 63 + ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
64 64
65 } 65 }
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
18 */ 18 */
19 package org.onlab.onos.net.flow; 19 package org.onlab.onos.net.flow;
20 20
21 +import java.util.concurrent.Future;
22 +
21 import org.onlab.onos.ApplicationId; 23 import org.onlab.onos.ApplicationId;
22 import org.onlab.onos.net.DeviceId; 24 import org.onlab.onos.net.DeviceId;
23 import org.onlab.onos.store.Store; 25 import org.onlab.onos.store.Store;
...@@ -25,7 +27,7 @@ import org.onlab.onos.store.Store; ...@@ -25,7 +27,7 @@ import org.onlab.onos.store.Store;
25 /** 27 /**
26 * Manages inventory of flow rules; not intended for direct use. 28 * Manages inventory of flow rules; not intended for direct use.
27 */ 29 */
28 -public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> { 30 +public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDelegate> {
29 31
30 /** 32 /**
31 * Returns the number of flow rule in the store. 33 * Returns the number of flow rule in the store.
...@@ -59,12 +61,26 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat ...@@ -59,12 +61,26 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
59 Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId); 61 Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId);
60 62
61 /** 63 /**
64 + // TODO: Better description of method behavior.
62 * Stores a new flow rule without generating events. 65 * Stores a new flow rule without generating events.
63 * 66 *
64 * @param rule the flow rule to add 67 * @param rule the flow rule to add
65 - * @return true if the rule should be handled locally
66 */ 68 */
67 - boolean storeFlowRule(FlowRule rule); 69 + void storeFlowRule(FlowRule rule);
70 +
71 + /**
72 + * Stores a batch of flow rules.
73 + * @param batchOperation batch of flow rules.
74 + * @return Future response indicating success/failure of the batch operation
75 + * all the way down to the device.
76 + */
77 + Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
78 +
79 + /**
80 + * Invoked on the completion of a storeBatch operation.
81 + * @param result
82 + */
83 + void batchOperationComplete(FlowRuleBatchEvent event);
68 84
69 /** 85 /**
70 * Marks a flow rule for deletion. Actual deletion will occur 86 * Marks a flow rule for deletion. Actual deletion will occur
...@@ -73,7 +89,7 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat ...@@ -73,7 +89,7 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
73 * @param rule the flow rule to delete 89 * @param rule the flow rule to delete
74 * @return true if the rule should be handled locally 90 * @return true if the rule should be handled locally
75 */ 91 */
76 - boolean deleteFlowRule(FlowRule rule); 92 + void deleteFlowRule(FlowRule rule);
77 93
78 /** 94 /**
79 * Stores a new flow rule, or updates an existing entry. 95 * Stores a new flow rule, or updates an existing entry.
......
...@@ -23,5 +23,5 @@ import org.onlab.onos.store.StoreDelegate; ...@@ -23,5 +23,5 @@ import org.onlab.onos.store.StoreDelegate;
23 /** 23 /**
24 * Flow rule store delegate abstraction. 24 * Flow rule store delegate abstraction.
25 */ 25 */
26 -public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleEvent> { 26 +public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleBatchEvent> {
27 } 27 }
......
...@@ -5,8 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -5,8 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger;
5 5
6 import java.util.List; 6 import java.util.List;
7 import java.util.Map; 7 import java.util.Map;
8 +import java.util.Set;
8 import java.util.concurrent.CancellationException; 9 import java.util.concurrent.CancellationException;
9 import java.util.concurrent.ExecutionException; 10 import java.util.concurrent.ExecutionException;
11 +import java.util.concurrent.Executors;
10 import java.util.concurrent.Future; 12 import java.util.concurrent.Future;
11 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
...@@ -30,7 +32,9 @@ import org.onlab.onos.net.flow.FlowEntry; ...@@ -30,7 +32,9 @@ import org.onlab.onos.net.flow.FlowEntry;
30 import org.onlab.onos.net.flow.FlowRule; 32 import org.onlab.onos.net.flow.FlowRule;
31 import org.onlab.onos.net.flow.FlowRuleBatchEntry; 33 import org.onlab.onos.net.flow.FlowRuleBatchEntry;
32 import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation; 34 import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
35 +import org.onlab.onos.net.flow.FlowRuleBatchEvent;
33 import org.onlab.onos.net.flow.FlowRuleBatchOperation; 36 import org.onlab.onos.net.flow.FlowRuleBatchOperation;
37 +import org.onlab.onos.net.flow.FlowRuleBatchRequest;
34 import org.onlab.onos.net.flow.FlowRuleEvent; 38 import org.onlab.onos.net.flow.FlowRuleEvent;
35 import org.onlab.onos.net.flow.FlowRuleListener; 39 import org.onlab.onos.net.flow.FlowRuleListener;
36 import org.onlab.onos.net.flow.FlowRuleProvider; 40 import org.onlab.onos.net.flow.FlowRuleProvider;
...@@ -47,6 +51,9 @@ import com.google.common.collect.ArrayListMultimap; ...@@ -47,6 +51,9 @@ import com.google.common.collect.ArrayListMultimap;
47 import com.google.common.collect.Lists; 51 import com.google.common.collect.Lists;
48 import com.google.common.collect.Maps; 52 import com.google.common.collect.Maps;
49 import com.google.common.collect.Multimap; 53 import com.google.common.collect.Multimap;
54 +import com.google.common.collect.Sets;
55 +import com.google.common.util.concurrent.Futures;
56 +import com.google.common.util.concurrent.ListenableFuture;
50 57
51 /** 58 /**
52 * Provides implementation of the flow NB &amp; SB APIs. 59 * Provides implementation of the flow NB &amp; SB APIs.
...@@ -104,14 +111,7 @@ public class FlowRuleManager ...@@ -104,14 +111,7 @@ public class FlowRuleManager
104 public void applyFlowRules(FlowRule... flowRules) { 111 public void applyFlowRules(FlowRule... flowRules) {
105 for (int i = 0; i < flowRules.length; i++) { 112 for (int i = 0; i < flowRules.length; i++) {
106 FlowRule f = flowRules[i]; 113 FlowRule f = flowRules[i];
107 - boolean local = store.storeFlowRule(f); 114 + store.storeFlowRule(f);
108 - if (local) {
109 - // TODO: aggregate all local rules and push down once?
110 - applyFlowRulesToProviders(f);
111 - eventDispatcher.post(
112 - new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, f));
113 -
114 - }
115 } 115 }
116 } 116 }
117 117
...@@ -135,13 +135,7 @@ public class FlowRuleManager ...@@ -135,13 +135,7 @@ public class FlowRuleManager
135 FlowRule f; 135 FlowRule f;
136 for (int i = 0; i < flowRules.length; i++) { 136 for (int i = 0; i < flowRules.length; i++) {
137 f = flowRules[i]; 137 f = flowRules[i];
138 - boolean local = store.deleteFlowRule(f); 138 + store.deleteFlowRule(f);
139 - if (local) {
140 - // TODO: aggregate all local rules and push down once?
141 - removeFlowRulesFromProviders(f);
142 - eventDispatcher.post(
143 - new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, f));
144 - }
145 } 139 }
146 } 140 }
147 141
...@@ -185,33 +179,21 @@ public class FlowRuleManager ...@@ -185,33 +179,21 @@ public class FlowRuleManager
185 @Override 179 @Override
186 public Future<CompletedBatchOperation> applyBatch( 180 public Future<CompletedBatchOperation> applyBatch(
187 FlowRuleBatchOperation batch) { 181 FlowRuleBatchOperation batch) {
188 - Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches = 182 + Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
189 ArrayListMultimap.create(); 183 ArrayListMultimap.create();
190 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList(); 184 List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
191 for (FlowRuleBatchEntry fbe : batch.getOperations()) { 185 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
192 final FlowRule f = fbe.getTarget(); 186 final FlowRule f = fbe.getTarget();
193 - final Device device = deviceService.getDevice(f.deviceId()); 187 + perDeviceBatches.put(f.deviceId(), fbe);
194 - final FlowRuleProvider frp = getProvider(device.providerId());
195 - batches.put(frp, fbe);
196 - switch (fbe.getOperator()) {
197 - case ADD:
198 - store.storeFlowRule(f);
199 - break;
200 - case REMOVE:
201 - store.deleteFlowRule(f);
202 - break;
203 - case MODIFY:
204 - default:
205 - log.error("Batch operation type {} unsupported.", fbe.getOperator());
206 - }
207 } 188 }
208 - for (FlowRuleProvider provider : batches.keySet()) { 189 +
190 + for (DeviceId deviceId : perDeviceBatches.keySet()) {
209 FlowRuleBatchOperation b = 191 FlowRuleBatchOperation b =
210 - new FlowRuleBatchOperation(batches.get(provider)); 192 + new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
211 - Future<CompletedBatchOperation> future = provider.executeBatch(b); 193 + Future<CompletedBatchOperation> future = store.storeBatch(b);
212 futures.add(future); 194 futures.add(future);
213 } 195 }
214 - return new FlowRuleBatchFuture(futures, batches); 196 + return new FlowRuleBatchFuture(futures, perDeviceBatches);
215 } 197 }
216 198
217 @Override 199 @Override
...@@ -324,6 +306,7 @@ public class FlowRuleManager ...@@ -324,6 +306,7 @@ public class FlowRuleManager
324 post(event); 306 post(event);
325 } 307 }
326 } else { 308 } else {
309 + log.info("Removing flow rules....");
327 removeFlowRules(flowEntry); 310 removeFlowRules(flowEntry);
328 } 311 }
329 312
...@@ -391,21 +374,47 @@ public class FlowRuleManager ...@@ -391,21 +374,47 @@ public class FlowRuleManager
391 374
392 // Store delegate to re-post events emitted from the store. 375 // Store delegate to re-post events emitted from the store.
393 private class InternalStoreDelegate implements FlowRuleStoreDelegate { 376 private class InternalStoreDelegate implements FlowRuleStoreDelegate {
377 + // TODO: Right now we only dispatch events at individual flowEntry level.
378 + // It may be more efficient for also dispatch events as a batch.
394 @Override 379 @Override
395 - public void notify(FlowRuleEvent event) { 380 + public void notify(FlowRuleBatchEvent event) {
381 + final FlowRuleBatchRequest request = event.subject();
396 switch (event.type()) { 382 switch (event.type()) {
397 - case RULE_ADD_REQUESTED: 383 + case BATCH_OPERATION_REQUESTED:
398 - applyFlowRulesToProviders(event.subject()); 384 + for (FlowEntry entry : request.toAdd()) {
399 - break; 385 + eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
400 - case RULE_REMOVE_REQUESTED: 386 + }
401 - removeFlowRulesFromProviders(event.subject()); 387 + for (FlowEntry entry : request.toRemove()) {
402 - break; 388 + eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
389 + }
390 + // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
403 391
404 - case RULE_ADDED: 392 + FlowRuleBatchOperation batchOperation = request.asBatchOperation();
405 - case RULE_REMOVED: 393 +
406 - case RULE_UPDATED: 394 + FlowRuleProvider flowRuleProvider =
407 - // only dispatch events related to switch 395 + getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
408 - eventDispatcher.post(event); 396 + final ListenableFuture<CompletedBatchOperation> result =
397 + flowRuleProvider.executeBatch(batchOperation);
398 + result.addListener(new Runnable() {
399 + @Override
400 + public void run() {
401 + store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
402 + }
403 + }, Executors.newCachedThreadPool());
404 +
405 + break;
406 + case BATCH_OPERATION_COMPLETED:
407 + Set<FlowEntry> failedItems = event.result().failedItems();
408 + for (FlowEntry entry : request.toAdd()) {
409 + if (!failedItems.contains(entry)) {
410 + eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
411 + }
412 + }
413 + for (FlowEntry entry : request.toRemove()) {
414 + if (!failedItems.contains(entry)) {
415 + eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
416 + }
417 + }
409 break; 418 break;
410 default: 419 default:
411 break; 420 break;
...@@ -413,18 +422,15 @@ public class FlowRuleManager ...@@ -413,18 +422,15 @@ public class FlowRuleManager
413 } 422 }
414 } 423 }
415 424
416 - private class FlowRuleBatchFuture 425 + private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
417 - implements Future<CompletedBatchOperation> {
418 426
419 private final List<Future<CompletedBatchOperation>> futures; 427 private final List<Future<CompletedBatchOperation>> futures;
420 - private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches; 428 + private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
421 private final AtomicReference<BatchState> state; 429 private final AtomicReference<BatchState> state;
422 private CompletedBatchOperation overall; 430 private CompletedBatchOperation overall;
423 431
424 -
425 -
426 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures, 432 public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
427 - Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) { 433 + Multimap<DeviceId, FlowRuleBatchEntry> batches) {
428 this.futures = futures; 434 this.futures = futures;
429 this.batches = batches; 435 this.batches = batches;
430 state = new AtomicReference<FlowRuleManager.BatchState>(); 436 state = new AtomicReference<FlowRuleManager.BatchState>();
...@@ -466,7 +472,7 @@ public class FlowRuleManager ...@@ -466,7 +472,7 @@ public class FlowRuleManager
466 } 472 }
467 473
468 boolean success = true; 474 boolean success = true;
469 - List<FlowEntry> failed = Lists.newLinkedList(); 475 + Set<FlowEntry> failed = Sets.newHashSet();
470 CompletedBatchOperation completed; 476 CompletedBatchOperation completed;
471 for (Future<CompletedBatchOperation> future : futures) { 477 for (Future<CompletedBatchOperation> future : futures) {
472 completed = future.get(); 478 completed = future.get();
...@@ -486,7 +492,7 @@ public class FlowRuleManager ...@@ -486,7 +492,7 @@ public class FlowRuleManager
486 return overall; 492 return overall;
487 } 493 }
488 boolean success = true; 494 boolean success = true;
489 - List<FlowEntry> failed = Lists.newLinkedList(); 495 + Set<FlowEntry> failed = Sets.newHashSet();
490 CompletedBatchOperation completed; 496 CompletedBatchOperation completed;
491 long start = System.nanoTime(); 497 long start = System.nanoTime();
492 long end = start + unit.toNanos(timeout); 498 long end = start + unit.toNanos(timeout);
...@@ -500,7 +506,7 @@ public class FlowRuleManager ...@@ -500,7 +506,7 @@ public class FlowRuleManager
500 return finalizeBatchOperation(success, failed); 506 return finalizeBatchOperation(success, failed);
501 } 507 }
502 508
503 - private boolean validateBatchOperation(List<FlowEntry> failed, 509 + private boolean validateBatchOperation(Set<FlowEntry> failed,
504 CompletedBatchOperation completed) { 510 CompletedBatchOperation completed) {
505 511
506 if (isCancelled()) { 512 if (isCancelled()) {
...@@ -522,7 +528,7 @@ public class FlowRuleManager ...@@ -522,7 +528,7 @@ public class FlowRuleManager
522 } 528 }
523 529
524 private CompletedBatchOperation finalizeBatchOperation(boolean success, 530 private CompletedBatchOperation finalizeBatchOperation(boolean success,
525 - List<FlowEntry> failed) { 531 + Set<FlowEntry> failed) {
526 synchronized (this) { 532 synchronized (this) {
527 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) { 533 if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
528 if (state.get() == BatchState.FINISHED) { 534 if (state.get() == BatchState.FINISHED) {
...@@ -545,11 +551,6 @@ public class FlowRuleManager ...@@ -545,11 +551,6 @@ public class FlowRuleManager
545 store.storeFlowRule(fbe.getTarget()); 551 store.storeFlowRule(fbe.getTarget());
546 } 552 }
547 } 553 }
548 -
549 } 554 }
550 } 555 }
551 -
552 -
553 -
554 -
555 } 556 }
......
...@@ -197,14 +197,7 @@ public class LinkManager ...@@ -197,14 +197,7 @@ public class LinkManager
197 checkNotNull(linkDescription, LINK_DESC_NULL); 197 checkNotNull(linkDescription, LINK_DESC_NULL);
198 checkValidity(); 198 checkValidity();
199 199
200 - ConnectPoint src = linkDescription.src();
201 - ConnectPoint dst = linkDescription.dst();
202 - // if we aren't master for the device associated with the ConnectPoint
203 - // we probably shouldn't be doing this.
204 200
205 -// if (deviceService.getRole(dst.deviceId()) != MastershipRole.MASTER) {
206 -// return;
207 -// }
208 LinkEvent event = store.createOrUpdateLink(provider().id(), 201 LinkEvent event = store.createOrUpdateLink(provider().id(),
209 linkDescription); 202 linkDescription);
210 if (event != null) { 203 if (event != null) {
...@@ -232,11 +225,7 @@ public class LinkManager ...@@ -232,11 +225,7 @@ public class LinkManager
232 public void linksVanished(ConnectPoint connectPoint) { 225 public void linksVanished(ConnectPoint connectPoint) {
233 checkNotNull(connectPoint, "Connect point cannot be null"); 226 checkNotNull(connectPoint, "Connect point cannot be null");
234 checkValidity(); 227 checkValidity();
235 - // if we aren't master for the device associated with the ConnectPoint 228 +
236 - // we probably shouldn't be doing this.
237 - if (deviceService.getRole(connectPoint.deviceId()) != MastershipRole.MASTER) {
238 - return;
239 - }
240 log.info("Links for connection point {} vanished", connectPoint); 229 log.info("Links for connection point {} vanished", connectPoint);
241 // FIXME: This will remove links registered by other providers 230 // FIXME: This will remove links registered by other providers
242 removeLinks(getLinks(connectPoint)); 231 removeLinks(getLinks(connectPoint));
...@@ -246,11 +235,7 @@ public class LinkManager ...@@ -246,11 +235,7 @@ public class LinkManager
246 public void linksVanished(DeviceId deviceId) { 235 public void linksVanished(DeviceId deviceId) {
247 checkNotNull(deviceId, DEVICE_ID_NULL); 236 checkNotNull(deviceId, DEVICE_ID_NULL);
248 checkValidity(); 237 checkValidity();
249 - // if we aren't master for the device associated with the ConnectPoint 238 +
250 - // we probably shouldn't be doing this.
251 - if (deviceService.getRole(deviceId) != MastershipRole.MASTER) {
252 - return;
253 - }
254 log.info("Links for device {} vanished", deviceId); 239 log.info("Links for device {} vanished", deviceId);
255 removeLinks(getDeviceLinks(deviceId)); 240 removeLinks(getDeviceLinks(deviceId));
256 } 241 }
......
...@@ -20,7 +20,6 @@ import org.onlab.onos.net.statistic.Load; ...@@ -20,7 +20,6 @@ import org.onlab.onos.net.statistic.Load;
20 import org.onlab.onos.net.statistic.StatisticService; 20 import org.onlab.onos.net.statistic.StatisticService;
21 import org.onlab.onos.net.statistic.StatisticStore; 21 import org.onlab.onos.net.statistic.StatisticStore;
22 import org.slf4j.Logger; 22 import org.slf4j.Logger;
23 -
24 import java.util.Set; 23 import java.util.Set;
25 24
26 import static org.slf4j.LoggerFactory.getLogger; 25 import static org.slf4j.LoggerFactory.getLogger;
...@@ -68,19 +67,54 @@ public class StatisticManager implements StatisticService { ...@@ -68,19 +67,54 @@ public class StatisticManager implements StatisticService {
68 67
69 @Override 68 @Override
70 public Link max(Path path) { 69 public Link max(Path path) {
70 + if (path.links().isEmpty()) {
71 return null; 71 return null;
72 } 72 }
73 + Load maxLoad = new DefaultLoad();
74 + Link maxLink = null;
75 + for (Link link : path.links()) {
76 + Load load = loadInternal(link.src());
77 + if (load.rate() > maxLoad.rate()) {
78 + maxLoad = load;
79 + maxLink = link;
80 + }
81 + }
82 + return maxLink;
83 + }
73 84
74 @Override 85 @Override
75 public Link min(Path path) { 86 public Link min(Path path) {
87 + if (path.links().isEmpty()) {
76 return null; 88 return null;
77 } 89 }
90 + Load minLoad = new DefaultLoad();
91 + Link minLink = null;
92 + for (Link link : path.links()) {
93 + Load load = loadInternal(link.src());
94 + if (load.rate() < minLoad.rate()) {
95 + minLoad = load;
96 + minLink = link;
97 + }
98 + }
99 + return minLink;
100 + }
78 101
79 @Override 102 @Override
80 public FlowRule highestHitter(ConnectPoint connectPoint) { 103 public FlowRule highestHitter(ConnectPoint connectPoint) {
104 + Set<FlowEntry> hitters = statisticStore.getCurrentStatistic(connectPoint);
105 + if (hitters.isEmpty()) {
81 return null; 106 return null;
82 } 107 }
83 108
109 + FlowEntry max = hitters.iterator().next();
110 + for (FlowEntry entry : hitters) {
111 + if (entry.bytes() > max.bytes()) {
112 + max = entry;
113 + }
114 + }
115 + return max;
116 + }
117 +
84 private Load loadInternal(ConnectPoint connectPoint) { 118 private Load loadInternal(ConnectPoint connectPoint) {
85 Set<FlowEntry> current; 119 Set<FlowEntry> current;
86 Set<FlowEntry> previous; 120 Set<FlowEntry> previous;
...@@ -123,16 +157,12 @@ public class StatisticManager implements StatisticService { ...@@ -123,16 +157,12 @@ public class StatisticManager implements StatisticService {
123 case RULE_UPDATED: 157 case RULE_UPDATED:
124 if (rule instanceof FlowEntry) { 158 if (rule instanceof FlowEntry) {
125 statisticStore.addOrUpdateStatistic((FlowEntry) rule); 159 statisticStore.addOrUpdateStatistic((FlowEntry) rule);
126 - } else {
127 - log.warn("IT AIN'T A FLOWENTRY");
128 } 160 }
129 break; 161 break;
130 case RULE_ADD_REQUESTED: 162 case RULE_ADD_REQUESTED:
131 - log.info("Preparing for stats");
132 statisticStore.prepareForStatistics(rule); 163 statisticStore.prepareForStatistics(rule);
133 break; 164 break;
134 case RULE_REMOVE_REQUESTED: 165 case RULE_REMOVE_REQUESTED:
135 - log.info("Removing stats");
136 statisticStore.removeFromStatistics(rule); 166 statisticStore.removeFromStatistics(rule);
137 break; 167 break;
138 case RULE_REMOVED: 168 case RULE_REMOVED:
......
1 package org.onlab.onos.net.flow.impl; 1 package org.onlab.onos.net.flow.impl;
2 2
3 +import static org.junit.Assert.assertEquals;
4 +import static org.junit.Assert.assertFalse;
5 +import static org.junit.Assert.assertNotNull;
6 +import static org.junit.Assert.assertTrue;
7 +import static org.junit.Assert.fail;
8 +import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
9 +import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
10 +import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
3 11
4 12
5 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*; 13 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.*;
6 14
7 -
8 import java.util.ArrayList; 15 import java.util.ArrayList;
9 import java.util.Collections; 16 import java.util.Collections;
10 import java.util.HashMap; 17 import java.util.HashMap;
...@@ -12,6 +19,7 @@ import java.util.List; ...@@ -12,6 +19,7 @@ import java.util.List;
12 import java.util.Map; 19 import java.util.Map;
13 import java.util.Set; 20 import java.util.Set;
14 import java.util.concurrent.ExecutionException; 21 import java.util.concurrent.ExecutionException;
22 +import java.util.concurrent.Executor;
15 import java.util.concurrent.Future; 23 import java.util.concurrent.Future;
16 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.TimeoutException; 25 import java.util.concurrent.TimeoutException;
...@@ -59,16 +67,7 @@ import com.google.common.collect.ImmutableList; ...@@ -59,16 +67,7 @@ import com.google.common.collect.ImmutableList;
59 import com.google.common.collect.ImmutableMap; 67 import com.google.common.collect.ImmutableMap;
60 import com.google.common.collect.Lists; 68 import com.google.common.collect.Lists;
61 import com.google.common.collect.Sets; 69 import com.google.common.collect.Sets;
62 - 70 +import com.google.common.util.concurrent.ListenableFuture;
63 -import static java.util.Collections.EMPTY_LIST;
64 -import static org.junit.Assert.assertEquals;
65 -import static org.junit.Assert.assertFalse;
66 -import static org.junit.Assert.assertNotNull;
67 -import static org.junit.Assert.assertTrue;
68 -import static org.junit.Assert.fail;
69 -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
70 -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
71 -import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
72 71
73 /** 72 /**
74 * Test codifying the flow rule service & flow rule provider service contracts. 73 * Test codifying the flow rule service & flow rule provider service contracts.
...@@ -182,7 +181,6 @@ public class FlowRuleManagerTest { ...@@ -182,7 +181,6 @@ public class FlowRuleManagerTest {
182 181
183 // TODO: If preserving iteration order is a requirement, redo FlowRuleStore. 182 // TODO: If preserving iteration order is a requirement, redo FlowRuleStore.
184 //backing store is sensitive to the order of additions/removals 183 //backing store is sensitive to the order of additions/removals
185 - @SuppressWarnings("unchecked")
186 private boolean validateState(Map<FlowRule, FlowEntryState> expected) { 184 private boolean validateState(Map<FlowRule, FlowEntryState> expected) {
187 Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected); 185 Map<FlowRule, FlowEntryState> expectedToCheck = new HashMap<>(expected);
188 Iterable<FlowEntry> rules = service.getFlowEntries(DID); 186 Iterable<FlowEntry> rules = service.getFlowEntries(DID);
...@@ -526,13 +524,13 @@ public class FlowRuleManagerTest { ...@@ -526,13 +524,13 @@ public class FlowRuleManagerTest {
526 } 524 }
527 525
528 @Override 526 @Override
529 - public Future<CompletedBatchOperation> executeBatch( 527 + public ListenableFuture<CompletedBatchOperation> executeBatch(
530 BatchOperation<FlowRuleBatchEntry> batch) { 528 BatchOperation<FlowRuleBatchEntry> batch) {
531 return new TestInstallationFuture(); 529 return new TestInstallationFuture();
532 } 530 }
533 531
534 private class TestInstallationFuture 532 private class TestInstallationFuture
535 - implements Future<CompletedBatchOperation> { 533 + implements ListenableFuture<CompletedBatchOperation> {
536 534
537 @Override 535 @Override
538 public boolean cancel(boolean mayInterruptIfRunning) { 536 public boolean cancel(boolean mayInterruptIfRunning) {
...@@ -550,10 +548,9 @@ public class FlowRuleManagerTest { ...@@ -550,10 +548,9 @@ public class FlowRuleManagerTest {
550 } 548 }
551 549
552 @Override 550 @Override
553 - @SuppressWarnings("unchecked")
554 public CompletedBatchOperation get() 551 public CompletedBatchOperation get()
555 throws InterruptedException, ExecutionException { 552 throws InterruptedException, ExecutionException {
556 - return new CompletedBatchOperation(true, EMPTY_LIST); 553 + return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet());
557 } 554 }
558 555
559 @Override 556 @Override
...@@ -562,6 +559,11 @@ public class FlowRuleManagerTest { ...@@ -562,6 +559,11 @@ public class FlowRuleManagerTest {
562 ExecutionException, TimeoutException { 559 ExecutionException, TimeoutException {
563 return null; 560 return null;
564 } 561 }
562 +
563 + @Override
564 + public void addListener(Runnable task, Executor executor) {
565 + // TODO: add stuff.
566 + }
565 } 567 }
566 568
567 } 569 }
......
...@@ -5,10 +5,14 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -5,10 +5,14 @@ import static org.slf4j.LoggerFactory.getLogger;
5 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*; 5 import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
6 6
7 import java.io.IOException; 7 import java.io.IOException;
8 +import java.util.ArrayList;
9 +import java.util.Arrays;
8 import java.util.Collection; 10 import java.util.Collection;
9 import java.util.Collections; 11 import java.util.Collections;
12 +import java.util.concurrent.Future;
10 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
15 +import java.util.List;
12 16
13 import org.apache.felix.scr.annotations.Activate; 17 import org.apache.felix.scr.annotations.Activate;
14 import org.apache.felix.scr.annotations.Component; 18 import org.apache.felix.scr.annotations.Component;
...@@ -19,11 +23,17 @@ import org.apache.felix.scr.annotations.Service; ...@@ -19,11 +23,17 @@ import org.apache.felix.scr.annotations.Service;
19 import org.onlab.onos.ApplicationId; 23 import org.onlab.onos.ApplicationId;
20 import org.onlab.onos.cluster.ClusterService; 24 import org.onlab.onos.cluster.ClusterService;
21 import org.onlab.onos.net.DeviceId; 25 import org.onlab.onos.net.DeviceId;
26 +import org.onlab.onos.net.flow.CompletedBatchOperation;
22 import org.onlab.onos.net.flow.DefaultFlowEntry; 27 import org.onlab.onos.net.flow.DefaultFlowEntry;
23 import org.onlab.onos.net.flow.FlowEntry; 28 import org.onlab.onos.net.flow.FlowEntry;
24 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; 29 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
25 import org.onlab.onos.net.flow.FlowRule; 30 import org.onlab.onos.net.flow.FlowRule;
31 +import org.onlab.onos.net.flow.FlowRuleBatchEntry;
32 +import org.onlab.onos.net.flow.FlowRuleBatchEvent;
33 +import org.onlab.onos.net.flow.FlowRuleBatchOperation;
34 +import org.onlab.onos.net.flow.FlowRuleBatchRequest;
26 import org.onlab.onos.net.flow.FlowRuleEvent; 35 import org.onlab.onos.net.flow.FlowRuleEvent;
36 +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
27 import org.onlab.onos.net.flow.FlowRuleEvent.Type; 37 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
28 import org.onlab.onos.net.flow.FlowRuleStore; 38 import org.onlab.onos.net.flow.FlowRuleStore;
29 import org.onlab.onos.net.flow.FlowRuleStoreDelegate; 39 import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
...@@ -43,6 +53,7 @@ import org.slf4j.Logger; ...@@ -43,6 +53,7 @@ import org.slf4j.Logger;
43 import com.google.common.collect.ArrayListMultimap; 53 import com.google.common.collect.ArrayListMultimap;
44 import com.google.common.collect.ImmutableSet; 54 import com.google.common.collect.ImmutableSet;
45 import com.google.common.collect.Multimap; 55 import com.google.common.collect.Multimap;
56 +import com.google.common.util.concurrent.Futures;
46 57
47 /** 58 /**
48 * Manages inventory of flow rules using a distributed state management protocol. 59 * Manages inventory of flow rules using a distributed state management protocol.
...@@ -50,7 +61,7 @@ import com.google.common.collect.Multimap; ...@@ -50,7 +61,7 @@ import com.google.common.collect.Multimap;
50 @Component(immediate = true) 61 @Component(immediate = true)
51 @Service 62 @Service
52 public class DistributedFlowRuleStore 63 public class DistributedFlowRuleStore
53 - extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate> 64 + extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
54 implements FlowRuleStore { 65 implements FlowRuleStore {
55 66
56 private final Logger log = getLogger(getClass()); 67 private final Logger log = getLogger(getClass());
...@@ -92,7 +103,7 @@ public class DistributedFlowRuleStore ...@@ -92,7 +103,7 @@ public class DistributedFlowRuleStore
92 public void handle(ClusterMessage message) { 103 public void handle(ClusterMessage message) {
93 FlowRule rule = SERIALIZER.decode(message.payload()); 104 FlowRule rule = SERIALIZER.decode(message.payload());
94 log.info("received add request for {}", rule); 105 log.info("received add request for {}", rule);
95 - storeFlowEntryInternal(rule); 106 + storeFlowRule(rule);
96 // FIXME what to respond. 107 // FIXME what to respond.
97 try { 108 try {
98 message.respond(SERIALIZER.encode("ACK")); 109 message.respond(SERIALIZER.encode("ACK"));
...@@ -108,7 +119,7 @@ public class DistributedFlowRuleStore ...@@ -108,7 +119,7 @@ public class DistributedFlowRuleStore
108 public void handle(ClusterMessage message) { 119 public void handle(ClusterMessage message) {
109 FlowRule rule = SERIALIZER.decode(message.payload()); 120 FlowRule rule = SERIALIZER.decode(message.payload());
110 log.info("received delete request for {}", rule); 121 log.info("received delete request for {}", rule);
111 - deleteFlowRuleInternal(rule); 122 + deleteFlowRule(rule);
112 // FIXME what to respond. 123 // FIXME what to respond.
113 try { 124 try {
114 message.respond(SERIALIZER.encode("ACK")); 125 message.respond(SERIALIZER.encode("ACK"));
...@@ -118,6 +129,22 @@ public class DistributedFlowRuleStore ...@@ -118,6 +129,22 @@ public class DistributedFlowRuleStore
118 129
119 } 130 }
120 }); 131 });
132 +
133 + clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
134 +
135 + @Override
136 + public void handle(ClusterMessage message) {
137 + FlowRule rule = SERIALIZER.decode(message.payload());
138 + log.info("received get flow entry request for {}", rule);
139 + FlowEntry flowEntry = getFlowEntryInternal(rule);
140 + try {
141 + message.respond(SERIALIZER.encode(flowEntry));
142 + } catch (IOException e) {
143 + log.error("Failed to respond back", e);
144 + }
145 + }
146 + });
147 +
121 log.info("Started"); 148 log.info("Started");
122 } 149 }
123 150
...@@ -127,6 +154,9 @@ public class DistributedFlowRuleStore ...@@ -127,6 +154,9 @@ public class DistributedFlowRuleStore
127 } 154 }
128 155
129 156
157 + // TODO: This is not a efficient operation on a distributed sharded
158 + // flow store. We need to revisit the need for this operation or at least
159 + // make it device specific.
130 @Override 160 @Override
131 public int getFlowRuleCount() { 161 public int getFlowRuleCount() {
132 return flowEntries.size(); 162 return flowEntries.size();
...@@ -134,9 +164,28 @@ public class DistributedFlowRuleStore ...@@ -134,9 +164,28 @@ public class DistributedFlowRuleStore
134 164
135 @Override 165 @Override
136 public synchronized FlowEntry getFlowEntry(FlowRule rule) { 166 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
167 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
168 + if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
137 return getFlowEntryInternal(rule); 169 return getFlowEntryInternal(rule);
138 } 170 }
139 171
172 + log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
173 + replicaInfo.master().orNull(), rule.deviceId());
174 +
175 + ClusterMessage message = new ClusterMessage(
176 + clusterService.getLocalNode().id(),
177 + FlowStoreMessageSubjects.GET_FLOW_ENTRY,
178 + SERIALIZER.encode(rule));
179 +
180 + try {
181 + ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
182 + return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
183 + } catch (IOException | TimeoutException e) {
184 + // FIXME: throw a FlowStoreException
185 + throw new RuntimeException(e);
186 + }
187 + }
188 +
140 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) { 189 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
141 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) { 190 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
142 if (f.equals(rule)) { 191 if (f.equals(rule)) {
...@@ -165,19 +214,30 @@ public class DistributedFlowRuleStore ...@@ -165,19 +214,30 @@ public class DistributedFlowRuleStore
165 } 214 }
166 215
167 @Override 216 @Override
168 - public boolean storeFlowRule(FlowRule rule) { 217 + public void storeFlowRule(FlowRule rule) {
169 - ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId()); 218 + storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
219 + }
220 +
221 + public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
222 + if (operation.getOperations().isEmpty()) {
223 + return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
224 + }
225 +
226 + DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
227 +
228 + ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
229 +
170 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) { 230 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
171 - return storeFlowEntryInternal(rule); 231 + return storeBatchInternal(operation);
172 } 232 }
173 233
174 - log.info("Forwarding storeFlowRule to {}, which is the primary (master) for device {}", 234 + log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
175 - replicaInfo.master().orNull(), rule.deviceId()); 235 + replicaInfo.master().orNull(), deviceId);
176 236
177 ClusterMessage message = new ClusterMessage( 237 ClusterMessage message = new ClusterMessage(
178 clusterService.getLocalNode().id(), 238 clusterService.getLocalNode().id(),
179 FlowStoreMessageSubjects.STORE_FLOW_RULE, 239 FlowStoreMessageSubjects.STORE_FLOW_RULE,
180 - SERIALIZER.encode(rule)); 240 + SERIALIZER.encode(operation));
181 241
182 try { 242 try {
183 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get()); 243 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
...@@ -186,58 +246,44 @@ public class DistributedFlowRuleStore ...@@ -186,58 +246,44 @@ public class DistributedFlowRuleStore
186 // FIXME: throw a FlowStoreException 246 // FIXME: throw a FlowStoreException
187 throw new RuntimeException(e); 247 throw new RuntimeException(e);
188 } 248 }
189 - return false; 249 +
250 + return null;
190 } 251 }
191 252
192 - private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) { 253 + private Future<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
254 + List<FlowEntry> toRemove = new ArrayList<>();
255 + List<FlowEntry> toAdd = new ArrayList<>();
256 + // TODO: backup changes to hazelcast map
257 + for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
258 + FlowRule flowRule = batchEntry.getTarget();
259 + FlowRuleOperation op = batchEntry.getOperator();
260 + if (op.equals(FlowRuleOperation.REMOVE)) {
261 + StoredFlowEntry entry = getFlowEntryInternal(flowRule);
262 + if (entry != null) {
263 + entry.setState(FlowEntryState.PENDING_REMOVE);
264 + }
265 + toRemove.add(entry);
266 + } else if (op.equals(FlowRuleOperation.ADD)) {
193 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule); 267 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
194 DeviceId deviceId = flowRule.deviceId(); 268 DeviceId deviceId = flowRule.deviceId();
195 - // write to local copy.
196 if (!flowEntries.containsEntry(deviceId, flowEntry)) { 269 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
197 flowEntries.put(deviceId, flowEntry); 270 flowEntries.put(deviceId, flowEntry);
198 flowEntriesById.put(flowRule.appId(), flowEntry); 271 flowEntriesById.put(flowRule.appId(), flowEntry);
199 - notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule)); 272 + toAdd.add(flowEntry);
200 - return true;
201 } 273 }
202 - // write to backup.
203 - // TODO: write to a hazelcast map.
204 - return false;
205 } 274 }
206 -
207 - @Override
208 - public synchronized boolean deleteFlowRule(FlowRule rule) {
209 - ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
210 - if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
211 - return deleteFlowRuleInternal(rule);
212 - }
213 -
214 - ClusterMessage message = new ClusterMessage(
215 - clusterService.getLocalNode().id(),
216 - FlowStoreMessageSubjects.DELETE_FLOW_RULE,
217 - SERIALIZER.encode(rule));
218 -
219 - try {
220 - ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
221 - response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
222 - } catch (IOException | TimeoutException e) {
223 - // FIXME: throw a FlowStoreException
224 - throw new RuntimeException(e);
225 } 275 }
226 - return false; 276 + if (toAdd.isEmpty() && toRemove.isEmpty()) {
277 + return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
227 } 278 }
228 - 279 + notifyDelegate(FlowRuleBatchEvent.create(new FlowRuleBatchRequest(toAdd, toRemove)));
229 - private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) { 280 + // TODO: imlpement this.
230 - StoredFlowEntry entry = getFlowEntryInternal(flowRule); 281 + return Futures.immediateFailedFuture(new RuntimeException("Implement this."));
231 - if (entry == null) {
232 - return false;
233 } 282 }
234 - entry.setState(FlowEntryState.PENDING_REMOVE);
235 -
236 - // TODO: also update backup.
237 -
238 - notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
239 283
240 - return true; 284 + @Override
285 + public void deleteFlowRule(FlowRule rule) {
286 + storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
241 } 287 }
242 288
243 @Override 289 @Override
...@@ -315,4 +361,9 @@ public class DistributedFlowRuleStore ...@@ -315,4 +361,9 @@ public class DistributedFlowRuleStore
315 } 361 }
316 // TODO: also update backup. 362 // TODO: also update backup.
317 } 363 }
364 +
365 + @Override
366 + public void batchOperationComplete(FlowRuleBatchEvent event) {
367 + notifyDelegate(event);
368 + }
318 } 369 }
......
...@@ -12,4 +12,5 @@ public final class FlowStoreMessageSubjects { ...@@ -12,4 +12,5 @@ public final class FlowStoreMessageSubjects {
12 public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE = 12 public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
13 new MessageSubject("peer-forward-add-or-update-flow-rule"); 13 new MessageSubject("peer-forward-add-or-update-flow-rule");
14 public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule"); 14 public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
15 + public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
15 } 16 }
......
...@@ -399,7 +399,7 @@ public class GossipHostStore ...@@ -399,7 +399,7 @@ public class GossipHostStore
399 } 399 }
400 400
401 // Auxiliary extension to allow location to mutate. 401 // Auxiliary extension to allow location to mutate.
402 - private class StoredHost extends DefaultHost { 402 + private static final class StoredHost extends DefaultHost {
403 private Timestamped<HostLocation> location; 403 private Timestamped<HostLocation> location;
404 404
405 /** 405 /**
......
1 package org.onlab.onos.store.mastership.impl; 1 package org.onlab.onos.store.mastership.impl;
2 2
3 import java.util.Collections; 3 import java.util.Collections;
4 -import java.util.HashMap; 4 +import java.util.EnumMap;
5 import java.util.LinkedList; 5 import java.util.LinkedList;
6 import java.util.List; 6 import java.util.List;
7 import java.util.Map; 7 import java.util.Map;
...@@ -17,9 +17,9 @@ import com.google.common.base.MoreObjects.ToStringHelper; ...@@ -17,9 +17,9 @@ import com.google.common.base.MoreObjects.ToStringHelper;
17 * A structure that holds node mastership roles associated with a 17 * A structure that holds node mastership roles associated with a
18 * {@link DeviceId}. This structure needs to be locked through IMap. 18 * {@link DeviceId}. This structure needs to be locked through IMap.
19 */ 19 */
20 -public class RoleValue { 20 +final class RoleValue {
21 21
22 - protected Map<MastershipRole, List<NodeId>> value = new HashMap<>(); 22 + protected final Map<MastershipRole, List<NodeId>> value = new EnumMap<>(MastershipRole.class);
23 23
24 public RoleValue() { 24 public RoleValue() {
25 value.put(MastershipRole.MASTER, new LinkedList<NodeId>()); 25 value.put(MastershipRole.MASTER, new LinkedList<NodeId>());
...@@ -27,7 +27,8 @@ public class RoleValue { ...@@ -27,7 +27,8 @@ public class RoleValue {
27 value.put(MastershipRole.NONE, new LinkedList<NodeId>()); 27 value.put(MastershipRole.NONE, new LinkedList<NodeId>());
28 } 28 }
29 29
30 - public Map<MastershipRole, List<NodeId>> value() { 30 + // exposing internals for serialization purpose only
31 + Map<MastershipRole, List<NodeId>> value() {
31 return Collections.unmodifiableMap(value); 32 return Collections.unmodifiableMap(value);
32 } 33 }
33 34
......
...@@ -35,10 +35,10 @@ public class RoleValueSerializer extends Serializer<RoleValue> { ...@@ -35,10 +35,10 @@ public class RoleValueSerializer extends Serializer<RoleValue> {
35 35
36 @Override 36 @Override
37 public void write(Kryo kryo, Output output, RoleValue type) { 37 public void write(Kryo kryo, Output output, RoleValue type) {
38 - output.writeInt(type.value().size()); 38 + final Map<MastershipRole, List<NodeId>> map = type.value();
39 + output.writeInt(map.size());
39 40
40 - for (Map.Entry<MastershipRole, List<NodeId>> el : 41 + for (Map.Entry<MastershipRole, List<NodeId>> el : map.entrySet()) {
41 - type.value().entrySet()) {
42 output.writeInt(el.getKey().ordinal()); 42 output.writeInt(el.getKey().ordinal());
43 43
44 List<NodeId> nodes = el.getValue(); 44 List<NodeId> nodes = el.getValue();
......
...@@ -492,7 +492,10 @@ public class SMap<K, V> implements IMap<K, V> { ...@@ -492,7 +492,10 @@ public class SMap<K, V> implements IMap<K, V> {
492 } 492 }
493 493
494 private V deserializeVal(byte[] val) { 494 private V deserializeVal(byte[] val) {
495 - return serializer.decode(val); 495 + if (val == null) {
496 + return null;
497 + }
498 + return serializer.decode(val.clone());
496 } 499 }
497 500
498 private Set<byte[]> serializeKeySet(Set<K> keys) { 501 private Set<byte[]> serializeKeySet(Set<K> keys) {
......
...@@ -33,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultTrafficSelector; ...@@ -33,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultTrafficSelector;
33 import org.onlab.onos.net.flow.DefaultTrafficTreatment; 33 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
34 import org.onlab.onos.net.flow.FlowEntry; 34 import org.onlab.onos.net.flow.FlowEntry;
35 import org.onlab.onos.net.flow.FlowId; 35 import org.onlab.onos.net.flow.FlowId;
36 +import org.onlab.onos.net.flow.StoredFlowEntry;
36 import org.onlab.onos.net.flow.criteria.Criteria; 37 import org.onlab.onos.net.flow.criteria.Criteria;
37 import org.onlab.onos.net.flow.criteria.Criterion; 38 import org.onlab.onos.net.flow.criteria.Criterion;
38 import org.onlab.onos.net.flow.instructions.Instructions; 39 import org.onlab.onos.net.flow.instructions.Instructions;
...@@ -97,6 +98,8 @@ public final class KryoNamespaces { ...@@ -97,6 +98,8 @@ public final class KryoNamespaces {
97 HostId.class, 98 HostId.class,
98 HostDescription.class, 99 HostDescription.class,
99 DefaultHostDescription.class, 100 DefaultHostDescription.class,
101 + DefaultFlowEntry.class,
102 + StoredFlowEntry.class,
100 DefaultFlowRule.class, 103 DefaultFlowRule.class,
101 DefaultFlowEntry.class, 104 DefaultFlowEntry.class,
102 FlowEntry.FlowEntryState.class, 105 FlowEntry.FlowEntryState.class,
......
...@@ -3,6 +3,8 @@ package org.onlab.onos.store.trivial.impl; ...@@ -3,6 +3,8 @@ package org.onlab.onos.store.trivial.impl;
3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED; 3 import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4 import static org.slf4j.LoggerFactory.getLogger; 4 import static org.slf4j.LoggerFactory.getLogger;
5 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked; 5 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
6 +
7 +import java.util.Arrays;
6 import java.util.Collections; 8 import java.util.Collections;
7 import java.util.HashSet; 9 import java.util.HashSet;
8 import java.util.List; 10 import java.util.List;
...@@ -10,6 +12,7 @@ import java.util.Set; ...@@ -10,6 +12,7 @@ import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap; 12 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap; 13 import java.util.concurrent.ConcurrentMap;
12 import java.util.concurrent.CopyOnWriteArrayList; 14 import java.util.concurrent.CopyOnWriteArrayList;
15 +import java.util.concurrent.Future;
13 16
14 import org.apache.felix.scr.annotations.Activate; 17 import org.apache.felix.scr.annotations.Activate;
15 import org.apache.felix.scr.annotations.Component; 18 import org.apache.felix.scr.annotations.Component;
...@@ -17,11 +20,17 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -17,11 +20,17 @@ import org.apache.felix.scr.annotations.Deactivate;
17 import org.apache.felix.scr.annotations.Service; 20 import org.apache.felix.scr.annotations.Service;
18 import org.onlab.onos.ApplicationId; 21 import org.onlab.onos.ApplicationId;
19 import org.onlab.onos.net.DeviceId; 22 import org.onlab.onos.net.DeviceId;
23 +import org.onlab.onos.net.flow.CompletedBatchOperation;
20 import org.onlab.onos.net.flow.DefaultFlowEntry; 24 import org.onlab.onos.net.flow.DefaultFlowEntry;
21 import org.onlab.onos.net.flow.FlowEntry; 25 import org.onlab.onos.net.flow.FlowEntry;
22 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState; 26 import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
23 import org.onlab.onos.net.flow.FlowId; 27 import org.onlab.onos.net.flow.FlowId;
24 import org.onlab.onos.net.flow.FlowRule; 28 import org.onlab.onos.net.flow.FlowRule;
29 +import org.onlab.onos.net.flow.FlowRuleBatchEntry;
30 +import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
31 +import org.onlab.onos.net.flow.FlowRuleBatchEvent;
32 +import org.onlab.onos.net.flow.FlowRuleBatchOperation;
33 +import org.onlab.onos.net.flow.FlowRuleBatchRequest;
25 import org.onlab.onos.net.flow.FlowRuleEvent; 34 import org.onlab.onos.net.flow.FlowRuleEvent;
26 import org.onlab.onos.net.flow.FlowRuleEvent.Type; 35 import org.onlab.onos.net.flow.FlowRuleEvent.Type;
27 import org.onlab.onos.net.flow.FlowRuleStore; 36 import org.onlab.onos.net.flow.FlowRuleStore;
...@@ -33,6 +42,7 @@ import org.slf4j.Logger; ...@@ -33,6 +42,7 @@ import org.slf4j.Logger;
33 42
34 import com.google.common.base.Function; 43 import com.google.common.base.Function;
35 import com.google.common.collect.FluentIterable; 44 import com.google.common.collect.FluentIterable;
45 +import com.google.common.util.concurrent.Futures;
36 46
37 /** 47 /**
38 * Manages inventory of flow rules using trivial in-memory implementation. 48 * Manages inventory of flow rules using trivial in-memory implementation.
...@@ -40,7 +50,7 @@ import com.google.common.collect.FluentIterable; ...@@ -40,7 +50,7 @@ import com.google.common.collect.FluentIterable;
40 @Component(immediate = true) 50 @Component(immediate = true)
41 @Service 51 @Service
42 public class SimpleFlowRuleStore 52 public class SimpleFlowRuleStore
43 - extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate> 53 + extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
44 implements FlowRuleStore { 54 implements FlowRuleStore {
45 55
46 private final Logger log = getLogger(getClass()); 56 private final Logger log = getLogger(getClass());
...@@ -148,12 +158,11 @@ public class SimpleFlowRuleStore ...@@ -148,12 +158,11 @@ public class SimpleFlowRuleStore
148 } 158 }
149 159
150 @Override 160 @Override
151 - public boolean storeFlowRule(FlowRule rule) { 161 + public void storeFlowRule(FlowRule rule) {
152 - final boolean added = storeFlowRuleInternal(rule); 162 + storeFlowRuleInternal(rule);
153 - return added;
154 } 163 }
155 164
156 - private boolean storeFlowRuleInternal(FlowRule rule) { 165 + private void storeFlowRuleInternal(FlowRule rule) {
157 StoredFlowEntry f = new DefaultFlowEntry(rule); 166 StoredFlowEntry f = new DefaultFlowEntry(rule);
158 final DeviceId did = f.deviceId(); 167 final DeviceId did = f.deviceId();
159 final FlowId fid = f.id(); 168 final FlowId fid = f.id();
...@@ -162,19 +171,20 @@ public class SimpleFlowRuleStore ...@@ -162,19 +171,20 @@ public class SimpleFlowRuleStore
162 for (StoredFlowEntry fe : existing) { 171 for (StoredFlowEntry fe : existing) {
163 if (fe.equals(rule)) { 172 if (fe.equals(rule)) {
164 // was already there? ignore 173 // was already there? ignore
165 - return false; 174 + return;
166 } 175 }
167 } 176 }
168 // new flow rule added 177 // new flow rule added
169 existing.add(f); 178 existing.add(f);
170 - // TODO: Should we notify only if it's "remote" event? 179 + notifyDelegate(FlowRuleBatchEvent.create(
171 - //notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule)); 180 + new FlowRuleBatchRequest(
172 - return true; 181 + Arrays.<FlowEntry>asList(f),
182 + Collections.<FlowEntry>emptyList())));
173 } 183 }
174 } 184 }
175 185
176 @Override 186 @Override
177 - public boolean deleteFlowRule(FlowRule rule) { 187 + public void deleteFlowRule(FlowRule rule) {
178 188
179 List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id()); 189 List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
180 190
...@@ -184,14 +194,17 @@ public class SimpleFlowRuleStore ...@@ -184,14 +194,17 @@ public class SimpleFlowRuleStore
184 synchronized (entry) { 194 synchronized (entry) {
185 entry.setState(FlowEntryState.PENDING_REMOVE); 195 entry.setState(FlowEntryState.PENDING_REMOVE);
186 // TODO: Should we notify only if it's "remote" event? 196 // TODO: Should we notify only if it's "remote" event?
187 - //notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule)); 197 + notifyDelegate(FlowRuleBatchEvent.create(
188 - return true; 198 + new FlowRuleBatchRequest(
199 + Collections.<FlowEntry>emptyList(),
200 + Arrays.<FlowEntry>asList(entry))));
189 } 201 }
190 } 202 }
191 } 203 }
192 } 204 }
205 +
206 +
193 //log.warn("Cannot find rule {}", rule); 207 //log.warn("Cannot find rule {}", rule);
194 - return false;
195 } 208 }
196 209
197 @Override 210 @Override
...@@ -237,4 +250,24 @@ public class SimpleFlowRuleStore ...@@ -237,4 +250,24 @@ public class SimpleFlowRuleStore
237 } 250 }
238 return null; 251 return null;
239 } 252 }
253 +
254 + @Override
255 + public Future<CompletedBatchOperation> storeBatch(
256 + FlowRuleBatchOperation batchOperation) {
257 + for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
258 + if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
259 + storeFlowRule(entry.getTarget());
260 + } else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
261 + deleteFlowRule(entry.getTarget());
262 + } else {
263 + throw new UnsupportedOperationException("Unsupported operation type");
264 + }
265 + }
266 + return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
267 + }
268 +
269 + @Override
270 + public void batchOperationComplete(FlowRuleBatchEvent event) {
271 + notifyDelegate(event);
272 + }
240 } 273 }
......
...@@ -269,7 +269,7 @@ public class SimpleHostStore ...@@ -269,7 +269,7 @@ public class SimpleHostStore
269 } 269 }
270 270
271 // Auxiliary extension to allow location to mutate. 271 // Auxiliary extension to allow location to mutate.
272 - private class StoredHost extends DefaultHost { 272 + private static final class StoredHost extends DefaultHost {
273 private HostLocation location; 273 private HostLocation location;
274 274
275 /** 275 /**
......
...@@ -41,7 +41,6 @@ import org.projectfloodlight.openflow.protocol.OFHello; ...@@ -41,7 +41,6 @@ import org.projectfloodlight.openflow.protocol.OFHello;
41 import org.projectfloodlight.openflow.protocol.OFHelloElem; 41 import org.projectfloodlight.openflow.protocol.OFHelloElem;
42 import org.projectfloodlight.openflow.protocol.OFMessage; 42 import org.projectfloodlight.openflow.protocol.OFMessage;
43 import org.projectfloodlight.openflow.protocol.OFPacketIn; 43 import org.projectfloodlight.openflow.protocol.OFPacketIn;
44 -import org.projectfloodlight.openflow.protocol.OFPacketOut;
45 import org.projectfloodlight.openflow.protocol.OFPortDescStatsReply; 44 import org.projectfloodlight.openflow.protocol.OFPortDescStatsReply;
46 import org.projectfloodlight.openflow.protocol.OFPortDescStatsRequest; 45 import org.projectfloodlight.openflow.protocol.OFPortDescStatsRequest;
47 import org.projectfloodlight.openflow.protocol.OFPortStatus; 46 import org.projectfloodlight.openflow.protocol.OFPortStatus;
...@@ -661,10 +660,9 @@ class OFChannelHandler extends IdleStateAwareChannelHandler { ...@@ -661,10 +660,9 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
661 * However, we could be more forgiving 660 * However, we could be more forgiving
662 * @param h the channel handler that received the message 661 * @param h the channel handler that received the message
663 * @param m the message 662 * @param m the message
664 - * @throws SwitchStateException 663 + * @throws SwitchStateException we always throw the exception
665 - * @throws SwitchStateExeption we always through the execption
666 */ 664 */
667 - // needs to be protected because enum members are acutally subclasses 665 + // needs to be protected because enum members are actually subclasses
668 protected void illegalMessageReceived(OFChannelHandler h, OFMessage m) 666 protected void illegalMessageReceived(OFChannelHandler h, OFMessage m)
669 throws SwitchStateException { 667 throws SwitchStateException {
670 String msg = getSwitchStateMessage(h, m, 668 String msg = getSwitchStateMessage(h, m,
...@@ -1025,7 +1023,9 @@ class OFChannelHandler extends IdleStateAwareChannelHandler { ...@@ -1025,7 +1023,9 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
1025 // all state for the original switch (with the same dpid), 1023 // all state for the original switch (with the same dpid),
1026 // which we obviously don't want. 1024 // which we obviously don't want.
1027 log.info("{}:removal called", getSwitchInfoString()); 1025 log.info("{}:removal called", getSwitchInfoString());
1026 + if (sw != null) {
1028 sw.removeConnectedSwitch(); 1027 sw.removeConnectedSwitch();
1028 + }
1029 } else { 1029 } else {
1030 // A duplicate was disconnected on this ChannelHandler, 1030 // A duplicate was disconnected on this ChannelHandler,
1031 // this is the same switch reconnecting, but the original state was 1031 // this is the same switch reconnecting, but the original state was
......
...@@ -1188,7 +1188,8 @@ public class OFSwitchImplCPqD13 extends AbstractOpenFlowSwitch { ...@@ -1188,7 +1188,8 @@ public class OFSwitchImplCPqD13 extends AbstractOpenFlowSwitch {
1188 .setHardTimeout(0) 1188 .setHardTimeout(0)
1189 .setXid(getNextTransactionId()) 1189 .setXid(getNextTransactionId())
1190 .build(); 1190 .build();
1191 - sendMsg(tableMissEntry); 1191 +
1192 + write(tableMissEntry);
1192 } 1193 }
1193 1194
1194 private void sendBarrier(boolean finalBarrier) { 1195 private void sendBarrier(boolean finalBarrier) {
...@@ -1200,7 +1201,8 @@ public class OFSwitchImplCPqD13 extends AbstractOpenFlowSwitch { ...@@ -1200,7 +1201,8 @@ public class OFSwitchImplCPqD13 extends AbstractOpenFlowSwitch {
1200 .buildBarrierRequest() 1201 .buildBarrierRequest()
1201 .setXid(xid) 1202 .setXid(xid)
1202 .build(); 1203 .build();
1203 - sendMsg(br); 1204 +
1205 + write(br);
1204 } 1206 }
1205 1207
1206 @Override 1208 @Override
...@@ -1210,7 +1212,7 @@ public class OFSwitchImplCPqD13 extends AbstractOpenFlowSwitch { ...@@ -1210,7 +1212,7 @@ public class OFSwitchImplCPqD13 extends AbstractOpenFlowSwitch {
1210 1212
1211 @Override 1213 @Override
1212 public void write(OFMessage msg) { 1214 public void write(OFMessage msg) {
1213 - this.channel.write(msg); 1215 + this.channel.write(Collections.singletonList(msg));
1214 1216
1215 } 1217 }
1216 1218
......
...@@ -217,7 +217,7 @@ public class LinkDiscovery implements TimerTask { ...@@ -217,7 +217,7 @@ public class LinkDiscovery implements TimerTask {
217 final PortNumber srcPort = PortNumber.portNumber(onoslldp.getPort()); 217 final PortNumber srcPort = PortNumber.portNumber(onoslldp.getPort());
218 final DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString()); 218 final DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
219 final DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId(); 219 final DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
220 - this.ackProbe(srcPort.toLong()); 220 + this.ackProbe(dstPort.toLong());
221 ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort); 221 ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
222 ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort); 222 ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
223 223
...@@ -245,7 +245,7 @@ public class LinkDiscovery implements TimerTask { ...@@ -245,7 +245,7 @@ public class LinkDiscovery implements TimerTask {
245 */ 245 */
246 @Override 246 @Override
247 public void run(final Timeout t) { 247 public void run(final Timeout t) {
248 - this.log.debug("sending probes"); 248 + this.log.trace("sending probes");
249 synchronized (this) { 249 synchronized (this) {
250 final Iterator<Long> fastIterator = this.fastPorts.iterator(); 250 final Iterator<Long> fastIterator = this.fastPorts.iterator();
251 Long portNumber; 251 Long portNumber;
...@@ -256,7 +256,7 @@ public class LinkDiscovery implements TimerTask { ...@@ -256,7 +256,7 @@ public class LinkDiscovery implements TimerTask {
256 .getAndIncrement(); 256 .getAndIncrement();
257 257
258 if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) { 258 if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) {
259 - this.log.debug("sending fast probe to port"); 259 + this.log.trace("sending fast probe to port");
260 sendProbes(portNumber); 260 sendProbes(portNumber);
261 } else { 261 } else {
262 // Update fast and slow ports 262 // Update fast and slow ports
...@@ -278,7 +278,7 @@ public class LinkDiscovery implements TimerTask { ...@@ -278,7 +278,7 @@ public class LinkDiscovery implements TimerTask {
278 Iterator<Long> slowIterator = this.slowPorts.iterator(); 278 Iterator<Long> slowIterator = this.slowPorts.iterator();
279 while (slowIterator.hasNext()) { 279 while (slowIterator.hasNext()) {
280 portNumber = slowIterator.next(); 280 portNumber = slowIterator.next();
281 - this.log.debug("sending slow probe to port {}", portNumber); 281 + this.log.trace("sending slow probe to port {}", portNumber);
282 282
283 sendProbes(portNumber); 283 sendProbes(portNumber);
284 284
......
...@@ -10,7 +10,7 @@ import java.util.Set; ...@@ -10,7 +10,7 @@ import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap; 10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.CountDownLatch; 11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.ExecutionException; 12 import java.util.concurrent.ExecutionException;
13 -import java.util.concurrent.Future; 13 +import java.util.concurrent.Executor;
14 import java.util.concurrent.TimeUnit; 14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.TimeoutException; 15 import java.util.concurrent.TimeoutException;
16 import java.util.concurrent.atomic.AtomicBoolean; 16 import java.util.concurrent.atomic.AtomicBoolean;
...@@ -69,9 +69,11 @@ import org.projectfloodlight.openflow.types.U32; ...@@ -69,9 +69,11 @@ import org.projectfloodlight.openflow.types.U32;
69 import org.slf4j.Logger; 69 import org.slf4j.Logger;
70 70
71 import com.google.common.collect.ArrayListMultimap; 71 import com.google.common.collect.ArrayListMultimap;
72 -import com.google.common.collect.Lists;
73 import com.google.common.collect.Maps; 72 import com.google.common.collect.Maps;
74 import com.google.common.collect.Multimap; 73 import com.google.common.collect.Multimap;
74 +import com.google.common.collect.Sets;
75 +import com.google.common.util.concurrent.ExecutionList;
76 +import com.google.common.util.concurrent.ListenableFuture;
75 77
76 /** 78 /**
77 * Provider which uses an OpenFlow controller to detect network 79 * Provider which uses an OpenFlow controller to detect network
...@@ -97,6 +99,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -97,6 +99,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
97 99
98 private final InternalFlowProvider listener = new InternalFlowProvider(); 100 private final InternalFlowProvider listener = new InternalFlowProvider();
99 101
102 + // FIXME: This should be an expiring map to ensure futures that don't have
103 + // a future eventually get garbage collected.
100 private final Map<Long, InstallationFuture> pendingFutures = 104 private final Map<Long, InstallationFuture> pendingFutures =
101 new ConcurrentHashMap<Long, InstallationFuture>(); 105 new ConcurrentHashMap<Long, InstallationFuture>();
102 106
...@@ -169,7 +173,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -169,7 +173,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
169 } 173 }
170 174
171 @Override 175 @Override
172 - public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) { 176 + public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
173 final Set<Dpid> sws = 177 final Set<Dpid> sws =
174 Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>()); 178 Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
175 final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>(); 179 final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
...@@ -330,18 +334,20 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -330,18 +334,20 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
330 334
331 } 335 }
332 336
333 - private class InstallationFuture implements Future<CompletedBatchOperation> { 337 + private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
334 338
335 private final Set<Dpid> sws; 339 private final Set<Dpid> sws;
336 private final AtomicBoolean ok = new AtomicBoolean(true); 340 private final AtomicBoolean ok = new AtomicBoolean(true);
337 private final Map<Long, FlowRuleBatchEntry> fms; 341 private final Map<Long, FlowRuleBatchEntry> fms;
338 342
339 - private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList(); 343 + private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
340 344
341 private final CountDownLatch countDownLatch; 345 private final CountDownLatch countDownLatch;
342 private Long pendingXid; 346 private Long pendingXid;
343 private BatchState state; 347 private BatchState state;
344 348
349 + private final ExecutionList executionList = new ExecutionList();
350 +
345 public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) { 351 public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
346 this.state = BatchState.STARTED; 352 this.state = BatchState.STARTED;
347 this.sws = sws; 353 this.sws = sws;
...@@ -350,6 +356,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -350,6 +356,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
350 } 356 }
351 357
352 public void fail(OFErrorMsg msg, Dpid dpid) { 358 public void fail(OFErrorMsg msg, Dpid dpid) {
359 +
353 ok.set(false); 360 ok.set(false);
354 removeRequirement(dpid); 361 removeRequirement(dpid);
355 FlowEntry fe = null; 362 FlowEntry fe = null;
...@@ -422,6 +429,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -422,6 +429,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
422 429
423 @Override 430 @Override
424 public boolean cancel(boolean mayInterruptIfRunning) { 431 public boolean cancel(boolean mayInterruptIfRunning) {
432 + if (isDone()) {
433 + return false;
434 + }
425 ok.set(false); 435 ok.set(false);
426 this.state = BatchState.CANCELLED; 436 this.state = BatchState.CANCELLED;
427 cleanUp(); 437 cleanUp();
...@@ -434,7 +444,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -434,7 +444,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
434 } 444 }
435 445
436 } 446 }
437 - return isCancelled(); 447 + invokeCallbacks();
448 + return true;
438 } 449 }
439 450
440 @Override 451 @Override
...@@ -444,14 +455,15 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -444,14 +455,15 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
444 455
445 @Override 456 @Override
446 public boolean isDone() { 457 public boolean isDone() {
447 - return this.state == BatchState.FINISHED; 458 + return this.state == BatchState.FINISHED || isCancelled();
448 } 459 }
449 460
450 @Override 461 @Override
451 public CompletedBatchOperation get() throws InterruptedException, ExecutionException { 462 public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
452 countDownLatch.await(); 463 countDownLatch.await();
453 this.state = BatchState.FINISHED; 464 this.state = BatchState.FINISHED;
454 - return new CompletedBatchOperation(ok.get(), offendingFlowMods); 465 + CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
466 + return result;
455 } 467 }
456 468
457 @Override 469 @Override
...@@ -460,7 +472,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -460,7 +472,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
460 TimeoutException { 472 TimeoutException {
461 if (countDownLatch.await(timeout, unit)) { 473 if (countDownLatch.await(timeout, unit)) {
462 this.state = BatchState.FINISHED; 474 this.state = BatchState.FINISHED;
463 - return new CompletedBatchOperation(ok.get(), offendingFlowMods); 475 + CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
476 + return result;
464 } 477 }
465 throw new TimeoutException(); 478 throw new TimeoutException();
466 } 479 }
...@@ -478,10 +491,21 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -478,10 +491,21 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
478 491
479 private void removeRequirement(Dpid dpid) { 492 private void removeRequirement(Dpid dpid) {
480 countDownLatch.countDown(); 493 countDownLatch.countDown();
494 + if (countDownLatch.getCount() == 0) {
495 + invokeCallbacks();
496 + }
481 sws.remove(dpid); 497 sws.remove(dpid);
482 cleanUp(); 498 cleanUp();
483 } 499 }
484 500
501 + @Override
502 + public void addListener(Runnable runnable, Executor executor) {
503 + executionList.add(runnable, executor);
504 + }
505 +
506 + private void invokeCallbacks() {
507 + executionList.execute();
508 + }
485 } 509 }
486 510
487 } 511 }
......
...@@ -32,7 +32,7 @@ public final class ChassisId { ...@@ -32,7 +32,7 @@ public final class ChassisId {
32 * @param value the value to use. 32 * @param value the value to use.
33 */ 33 */
34 public ChassisId(String value) { 34 public ChassisId(String value) {
35 - this.value = Long.valueOf(value, 16); 35 + this.value = Long.parseLong(value, 16);
36 } 36 }
37 37
38 /** 38 /**
......
...@@ -379,7 +379,7 @@ public class DHCP extends BasePacket { ...@@ -379,7 +379,7 @@ public class DHCP extends BasePacket {
379 // 300 379 // 300
380 int optionsLength = 0; 380 int optionsLength = 0;
381 for (final DHCPOption option : this.options) { 381 for (final DHCPOption option : this.options) {
382 - if (option.getCode() == 0 || option.getCode() == 255) { 382 + if (option.getCode() == 0 || option.getCode() == ((byte) 255)) {
383 optionsLength += 1; 383 optionsLength += 1;
384 } else { 384 } else {
385 optionsLength += 2 + (0xff & option.getLength()); 385 optionsLength += 2 + (0xff & option.getLength());
......
...@@ -438,7 +438,7 @@ public class IPv4 extends BasePacket { ...@@ -438,7 +438,7 @@ public class IPv4 extends BasePacket {
438 438
439 int result = 0; 439 int result = 0;
440 for (int i = 0; i < 4; ++i) { 440 for (int i = 0; i < 4; ++i) {
441 - result |= Integer.valueOf(octets[i]) << (3 - i) * 8; 441 + result |= Integer.parseInt(octets[i]) << (3 - i) * 8;
442 } 442 }
443 return result; 443 return result;
444 } 444 }
...@@ -471,7 +471,7 @@ public class IPv4 extends BasePacket { ...@@ -471,7 +471,7 @@ public class IPv4 extends BasePacket {
471 int result = 0; 471 int result = 0;
472 for (int i = 0; i < 4; ++i) { 472 for (int i = 0; i < 4; ++i) {
473 result = ipAddress >> (3 - i) * 8 & 0xff; 473 result = ipAddress >> (3 - i) * 8 & 0xff;
474 - sb.append(Integer.valueOf(result).toString()); 474 + sb.append(result);
475 if (i != 3) { 475 if (i != 3) {
476 sb.append("."); 476 sb.append(".");
477 } 477 }
......
...@@ -14,7 +14,7 @@ public final class HexString { ...@@ -14,7 +14,7 @@ public final class HexString {
14 */ 14 */
15 public static String toHexString(final byte[] bytes) { 15 public static String toHexString(final byte[] bytes) {
16 int i; 16 int i;
17 - StringBuilder ret = new StringBuilder(); 17 + StringBuilder ret = new StringBuilder(bytes.length * 3 - 1);
18 String tmp; 18 String tmp;
19 for (i = 0; i < bytes.length; i++) { 19 for (i = 0; i < bytes.length; i++) {
20 if (i > 0) { 20 if (i > 0) {
...@@ -31,22 +31,22 @@ public final class HexString { ...@@ -31,22 +31,22 @@ public final class HexString {
31 31
32 public static String toHexString(final long val, final int padTo) { 32 public static String toHexString(final long val, final int padTo) {
33 char[] arr = Long.toHexString(val).toCharArray(); 33 char[] arr = Long.toHexString(val).toCharArray();
34 - String ret = ""; 34 + StringBuilder ret = new StringBuilder(padTo * 3 - 1);
35 // prepend the right number of leading zeros 35 // prepend the right number of leading zeros
36 int i = 0; 36 int i = 0;
37 for (; i < (padTo * 2 - arr.length); i++) { 37 for (; i < (padTo * 2 - arr.length); i++) {
38 - ret += "0"; 38 + ret.append('0');
39 if ((i % 2) != 0) { 39 if ((i % 2) != 0) {
40 - ret += ":"; 40 + ret.append(':');
41 } 41 }
42 } 42 }
43 for (int j = 0; j < arr.length; j++) { 43 for (int j = 0; j < arr.length; j++) {
44 - ret += arr[j]; 44 + ret.append(arr[j]);
45 if ((((i + j) % 2) != 0) && (j < (arr.length - 1))) { 45 if ((((i + j) % 2) != 0) && (j < (arr.length - 1))) {
46 - ret += ":"; 46 + ret.append(':');
47 } 47 }
48 } 48 }
49 - return ret; 49 + return ret.toString();
50 } 50 }
51 51
52 public static String toHexString(final long val) { 52 public static String toHexString(final long val) {
......
...@@ -163,6 +163,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -163,6 +163,7 @@ public class NettyMessagingService implements MessagingService {
163 handlers.putIfAbsent(type, handler); 163 handlers.putIfAbsent(type, handler);
164 } 164 }
165 165
166 + @Override
166 public void unregisterHandler(String type) { 167 public void unregisterHandler(String type) {
167 handlers.remove(type); 168 handlers.remove(type);
168 } 169 }
...@@ -242,7 +243,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -242,7 +243,7 @@ public class NettyMessagingService implements MessagingService {
242 } 243 }
243 } 244 }
244 245
245 - private class WriteTask implements Runnable { 246 + private static class WriteTask implements Runnable {
246 247
247 private final InternalMessage message; 248 private final InternalMessage message;
248 private final Channel channel; 249 private final Channel channel;
......