Madan Jampani

Towards a distributed flow rule store

package org.onlab.onos.net.flow;
import java.util.List;
import java.util.Set;
/**
* Interface capturing the result of a batch operation.
......@@ -15,9 +15,9 @@ public interface BatchOperationResult<T> {
boolean isSuccess();
/**
* Obtains a list of items which failed.
* @return a list of failures
* Obtains a set of items which failed.
* @return a set of failures
*/
List<T> failedItems();
Set<T> failedItems();
}
......
package org.onlab.onos.net.flow;
import java.util.List;
import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
private final boolean success;
private final List<FlowEntry> failures;
private final Set<FlowEntry> failures;
public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
public CompletedBatchOperation(boolean success, Set<FlowEntry> failures) {
this.success = success;
this.failures = ImmutableList.copyOf(failures);
this.failures = ImmutableSet.copyOf(failures);
}
@Override
......@@ -21,7 +21,7 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowEntry>
}
@Override
public List<FlowEntry> failedItems() {
public Set<FlowEntry> failedItems() {
return failures;
}
......
package org.onlab.onos.net.flow;
import org.onlab.onos.event.AbstractEvent;
/**
* Describes flow rule batch event.
*/
public final class FlowRuleBatchEvent extends AbstractEvent<FlowRuleBatchEvent.Type, FlowRuleBatchRequest> {
/**
* Type of flow rule events.
*/
public enum Type {
/**
* Signifies that a batch operation has been initiated.
*/
BATCH_OPERATION_REQUESTED,
/**
* Signifies that a batch operation has completed.
*/
BATCH_OPERATION_COMPLETED,
}
private final CompletedBatchOperation result;
/**
* Constructs a new FlowRuleBatchEvent.
* @param request batch operation request.
* @return event.
*/
public static FlowRuleBatchEvent create(FlowRuleBatchRequest request) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_REQUESTED, request, null);
return event;
}
/**
* Constructs a new FlowRuleBatchEvent.
* @param request batch operation request.
* @param result completed batch operation result.
* @return event.
*/
public static FlowRuleBatchEvent create(FlowRuleBatchRequest request, CompletedBatchOperation result) {
FlowRuleBatchEvent event = new FlowRuleBatchEvent(Type.BATCH_OPERATION_COMPLETED, request, result);
return event;
}
/**
* Returns the result of this batch operation.
* @return batch operation result.
*/
public CompletedBatchOperation result() {
return result;
}
/**
* Creates an event of a given type and for the specified flow rule batch.
*
* @param type flow rule batch event type
* @param batch event flow rule batch subject
*/
private FlowRuleBatchEvent(Type type, FlowRuleBatchRequest request, CompletedBatchOperation result) {
super(type, request);
this.result = result;
}
}
package org.onlab.onos.net.flow;
import java.util.Collections;
import java.util.List;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import com.google.common.collect.Lists;
public class FlowRuleBatchRequest {
private final List<FlowEntry> toAdd;
private final List<FlowEntry> toRemove;
public FlowRuleBatchRequest(List<FlowEntry> toAdd, List<FlowEntry> toRemove) {
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
public List<FlowEntry> toAdd() {
return toAdd;
}
public List<FlowEntry> toRemove() {
return toRemove;
}
public FlowRuleBatchOperation asBatchOperation() {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
for (FlowEntry e : toAdd) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
}
for (FlowEntry e : toRemove) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
}
return new FlowRuleBatchOperation(entries);
}
}
package org.onlab.onos.net.flow;
import java.util.concurrent.Future;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.Provider;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Abstraction of a flow rule provider.
*/
......@@ -43,6 +43,6 @@ public interface FlowRuleProvider extends Provider {
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
......
package org.onlab.onos.net.flow;
import java.util.concurrent.Future;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Store;
......@@ -7,7 +9,7 @@ import org.onlab.onos.store.Store;
/**
* Manages inventory of flow rules; not intended for direct use.
*/
public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> {
public interface FlowRuleStore extends Store<FlowRuleBatchEvent, FlowRuleStoreDelegate> {
/**
* Returns the number of flow rule in the store.
......@@ -41,12 +43,26 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId);
/**
// TODO: Better description of method behavior.
* Stores a new flow rule without generating events.
*
* @param rule the flow rule to add
* @return true if the rule should be handled locally
*/
boolean storeFlowRule(FlowRule rule);
void storeFlowRule(FlowRule rule);
/**
* Stores a batch of flow rules.
* @param batchOperation batch of flow rules.
* @return Future response indicating success/failure of the batch operation
* all the way down to the device.
*/
Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation batchOperation);
/**
* Invoked on the completion of a storeBatch operation.
* @param result
*/
void batchOperationComplete(FlowRuleBatchEvent event);
/**
* Marks a flow rule for deletion. Actual deletion will occur
......@@ -55,7 +71,7 @@ public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegat
* @param rule the flow rule to delete
* @return true if the rule should be handled locally
*/
boolean deleteFlowRule(FlowRule rule);
void deleteFlowRule(FlowRule rule);
/**
* Stores a new flow rule, or updates an existing entry.
......
......@@ -5,5 +5,5 @@ import org.onlab.onos.store.StoreDelegate;
/**
* Flow rule store delegate abstraction.
*/
public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleEvent> {
public interface FlowRuleStoreDelegate extends StoreDelegate<FlowRuleBatchEvent> {
}
......
......@@ -5,8 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -30,7 +32,9 @@ import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchEvent;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -47,6 +51,9 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Provides implementation of the flow NB &amp; SB APIs.
......@@ -104,11 +111,7 @@ public class FlowRuleManager
public void applyFlowRules(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
FlowRule f = flowRules[i];
boolean local = store.storeFlowRule(f);
if (local) {
// TODO: aggregate all local rules and push down once?
applyFlowRulesToProviders(f);
}
store.storeFlowRule(f);
}
}
......@@ -132,11 +135,7 @@ public class FlowRuleManager
FlowRule f;
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
boolean local = store.deleteFlowRule(f);
if (local) {
// TODO: aggregate all local rules and push down once?
removeFlowRulesFromProviders(f);
}
store.deleteFlowRule(f);
}
}
......@@ -180,33 +179,21 @@ public class FlowRuleManager
@Override
public Future<CompletedBatchOperation> applyBatch(
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches =
ArrayListMultimap.create();
List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
batches.put(frp, fbe);
switch (fbe.getOperator()) {
case ADD:
store.storeFlowRule(f);
break;
case REMOVE:
store.deleteFlowRule(f);
break;
case MODIFY:
default:
log.error("Batch operation type {} unsupported.", fbe.getOperator());
perDeviceBatches.put(f.deviceId(), fbe);
}
}
for (FlowRuleProvider provider : batches.keySet()) {
for (DeviceId deviceId : perDeviceBatches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
Future<CompletedBatchOperation> future = provider.executeBatch(b);
new FlowRuleBatchOperation(perDeviceBatches.get(deviceId));
Future<CompletedBatchOperation> future = store.storeBatch(b);
futures.add(future);
}
return new FlowRuleBatchFuture(futures, batches);
return new FlowRuleBatchFuture(futures, perDeviceBatches);
}
@Override
......@@ -318,6 +305,7 @@ public class FlowRuleManager
post(event);
}
} else {
log.info("Removing flow rules....");
removeFlowRules(flowEntry);
}
......@@ -385,21 +373,47 @@ public class FlowRuleManager
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@Override
public void notify(FlowRuleEvent event) {
public void notify(FlowRuleBatchEvent event) {
final FlowRuleBatchRequest request = event.subject();
switch (event.type()) {
case RULE_ADD_REQUESTED:
applyFlowRulesToProviders(event.subject());
break;
case RULE_REMOVE_REQUESTED:
removeFlowRulesFromProviders(event.subject());
break;
case BATCH_OPERATION_REQUESTED:
// for (FlowEntry entry : request.toAdd()) {
// //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADD_REQUESTED, entry));
// }
// for (FlowEntry entry : request.toRemove()) {
// //eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVE_REQUESTED, entry));
// }
// // FIXME: what about op.equals(FlowRuleOperation.MODIFY) ?
//
FlowRuleBatchOperation batchOperation = request.asBatchOperation();
FlowRuleProvider flowRuleProvider =
getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
final ListenableFuture<CompletedBatchOperation> result =
flowRuleProvider.executeBatch(batchOperation);
result.addListener(new Runnable() {
@Override
public void run() {
store.batchOperationComplete(FlowRuleBatchEvent.create(request, Futures.getUnchecked(result)));
}
}, Executors.newCachedThreadPool());
case RULE_ADDED:
case RULE_REMOVED:
case RULE_UPDATED:
// only dispatch events related to switch
eventDispatcher.post(event);
break;
case BATCH_OPERATION_COMPLETED:
Set<FlowEntry> failedItems = event.result().failedItems();
for (FlowEntry entry : request.toAdd()) {
if (!failedItems.contains(entry)) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED, entry));
}
}
for (FlowEntry entry : request.toRemove()) {
if (!failedItems.contains(entry)) {
eventDispatcher.post(new FlowRuleEvent(FlowRuleEvent.Type.RULE_REMOVED, entry));
}
}
break;
default:
break;
......@@ -407,18 +421,15 @@ public class FlowRuleManager
}
}
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
private class FlowRuleBatchFuture implements Future<CompletedBatchOperation> {
private final List<Future<CompletedBatchOperation>> futures;
private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
private final Multimap<DeviceId, FlowRuleBatchEntry> batches;
private final AtomicReference<BatchState> state;
private CompletedBatchOperation overall;
public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
Multimap<DeviceId, FlowRuleBatchEntry> batches) {
this.futures = futures;
this.batches = batches;
state = new AtomicReference<FlowRuleManager.BatchState>();
......@@ -460,7 +471,7 @@ public class FlowRuleManager
}
boolean success = true;
List<FlowEntry> failed = Lists.newLinkedList();
Set<FlowEntry> failed = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
......@@ -480,7 +491,7 @@ public class FlowRuleManager
return overall;
}
boolean success = true;
List<FlowEntry> failed = Lists.newLinkedList();
Set<FlowEntry> failed = Sets.newHashSet();
CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
......@@ -494,7 +505,7 @@ public class FlowRuleManager
return finalizeBatchOperation(success, failed);
}
private boolean validateBatchOperation(List<FlowEntry> failed,
private boolean validateBatchOperation(Set<FlowEntry> failed,
CompletedBatchOperation completed) {
if (isCancelled()) {
......@@ -516,7 +527,7 @@ public class FlowRuleManager
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
List<FlowEntry> failed) {
Set<FlowEntry> failed) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
......@@ -539,11 +550,6 @@ public class FlowRuleManager
store.storeFlowRule(fbe.getTarget());
}
}
}
}
}
......
package org.onlab.onos.net.flow.impl;
import static java.util.Collections.EMPTY_LIST;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
......@@ -17,6 +16,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -64,6 +64,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Test codifying the flow rule service & flow rule provider service contracts.
......@@ -515,13 +516,13 @@ public class FlowRuleManagerTest {
}
@Override
public Future<CompletedBatchOperation> executeBatch(
public ListenableFuture<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
return new TestInstallationFuture();
}
private class TestInstallationFuture
implements Future<CompletedBatchOperation> {
implements ListenableFuture<CompletedBatchOperation> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
......@@ -541,7 +542,7 @@ public class FlowRuleManagerTest {
@Override
public CompletedBatchOperation get()
throws InterruptedException, ExecutionException {
return new CompletedBatchOperation(true, EMPTY_LIST);
return new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet());
}
@Override
......@@ -550,6 +551,11 @@ public class FlowRuleManagerTest {
ExecutionException, TimeoutException {
return null;
}
@Override
public void addListener(Runnable task, Executor executor) {
// TODO: add stuff.
}
}
}
......
......@@ -12,4 +12,5 @@ public final class FlowStoreMessageSubjects {
public static final MessageSubject ADD_OR_UPDATE_FLOW_RULE =
new MessageSubject("peer-forward-add-or-update-flow-rule");
public static final MessageSubject REMOVE_FLOW_RULE = new MessageSubject("peer-forward-remove-flow-rule");
public static final MessageSubject GET_FLOW_ENTRY = new MessageSubject("peer-forward-get-flow-entry");
}
......
......@@ -26,10 +26,12 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DefaultDeviceDescription;
import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.StoredFlowEntry;
import org.onlab.onos.net.flow.criteria.Criteria;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instructions;
......@@ -93,6 +95,8 @@ public final class KryoNamespaces {
HostId.class,
HostDescription.class,
DefaultHostDescription.class,
DefaultFlowEntry.class,
StoredFlowEntry.class,
DefaultFlowRule.class,
FlowId.class,
DefaultTrafficSelector.class,
......
......@@ -3,6 +3,8 @@ package org.onlab.onos.store.trivial.impl;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
......@@ -10,6 +12,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -17,11 +20,17 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowId;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchEvent;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleBatchRequest;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
......@@ -33,6 +42,7 @@ import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.util.concurrent.Futures;
/**
* Manages inventory of flow rules using trivial in-memory implementation.
......@@ -40,7 +50,7 @@ import com.google.common.collect.FluentIterable;
@Component(immediate = true)
@Service
public class SimpleFlowRuleStore
extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
extends AbstractStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
implements FlowRuleStore {
private final Logger log = getLogger(getClass());
......@@ -148,12 +158,11 @@ public class SimpleFlowRuleStore
}
@Override
public boolean storeFlowRule(FlowRule rule) {
final boolean added = storeFlowRuleInternal(rule);
return added;
public void storeFlowRule(FlowRule rule) {
storeFlowRuleInternal(rule);
}
private boolean storeFlowRuleInternal(FlowRule rule) {
private void storeFlowRuleInternal(FlowRule rule) {
StoredFlowEntry f = new DefaultFlowEntry(rule);
final DeviceId did = f.deviceId();
final FlowId fid = f.id();
......@@ -162,19 +171,20 @@ public class SimpleFlowRuleStore
for (StoredFlowEntry fe : existing) {
if (fe.equals(rule)) {
// was already there? ignore
return false;
return;
}
}
// new flow rule added
existing.add(f);
// TODO: Should we notify only if it's "remote" event?
//notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, rule));
return true;
notifyDelegate(FlowRuleBatchEvent.create(
new FlowRuleBatchRequest(
Arrays.<FlowEntry>asList(f),
Collections.<FlowEntry>emptyList())));
}
}
@Override
public boolean deleteFlowRule(FlowRule rule) {
public void deleteFlowRule(FlowRule rule) {
List<StoredFlowEntry> entries = getFlowEntries(rule.deviceId(), rule.id());
synchronized (entries) {
......@@ -184,13 +194,11 @@ public class SimpleFlowRuleStore
entry.setState(FlowEntryState.PENDING_REMOVE);
// TODO: Should we notify only if it's "remote" event?
//notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, rule));
return true;
}
}
}
}
//log.warn("Cannot find rule {}", rule);
return false;
}
@Override
......@@ -236,4 +244,24 @@ public class SimpleFlowRuleStore
}
return null;
}
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
storeFlowRule(entry.getTarget());
} else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
deleteFlowRule(entry.getTarget());
} else {
throw new UnsupportedOperationException("Unsupported operation type");
}
}
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
}
@Override
public void batchOperationComplete(FlowRuleBatchEvent event) {
notifyDelegate(event);
}
}
......
......@@ -10,7 +10,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -69,9 +69,11 @@ import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Provider which uses an OpenFlow controller to detect network
......@@ -97,6 +99,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final InternalFlowProvider listener = new InternalFlowProvider();
// FIXME: This should be an expiring map to ensure futures that don't have
// a future eventually get garbage collected.
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
......@@ -159,7 +163,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws =
Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
......@@ -315,18 +319,20 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
}
private class InstallationFuture implements Future<CompletedBatchOperation> {
private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
private final CountDownLatch countDownLatch;
private Long pendingXid;
private BatchState state;
private final ExecutionList executionList = new ExecutionList();
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.state = BatchState.STARTED;
this.sws = sws;
......@@ -335,6 +341,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
removeRequirement(dpid);
FlowEntry fe = null;
......@@ -407,6 +414,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
}
ok.set(false);
this.state = BatchState.CANCELLED;
cleanUp();
......@@ -419,7 +429,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
}
return isCancelled();
invokeCallbacks();
return true;
}
@Override
......@@ -429,14 +440,15 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public boolean isDone() {
return this.state == BatchState.FINISHED;
return this.state == BatchState.FINISHED || isCancelled();
}
@Override
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
this.state = BatchState.FINISHED;
return new CompletedBatchOperation(ok.get(), offendingFlowMods);
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
return result;
}
@Override
......@@ -445,7 +457,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
TimeoutException {
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
return new CompletedBatchOperation(ok.get(), offendingFlowMods);
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
return result;
}
throw new TimeoutException();
}
......@@ -463,10 +476,21 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
if (countDownLatch.getCount() == 0) {
invokeCallbacks();
}
sws.remove(dpid);
cleanUp();
}
@Override
public void addListener(Runnable runnable, Executor executor) {
executionList.add(runnable, executor);
}
private void invokeCallbacks() {
executionList.execute();
}
}
}
......