Brian O'Connor
Committed by Gerrit Code Review

Updating Intent Manager to deal with failures.

Added ids to Flow batch futures.
Adding some basic unit tests for IntentManger
Adding failedIds to the completedOperation in FlowRuleManager

Change-Id: I7645cead193299f70d319d254cd1e82d96909e7b
Showing 20 changed files with 882 additions and 320 deletions
......@@ -15,6 +15,9 @@
*/
package org.onlab.onos.net.flow;
import java.util.Collections;
import java.util.Set;
import com.google.common.collect.ImmutableSet;
......@@ -26,6 +29,21 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
private final boolean success;
private final Set<FlowRule> failures;
private final Set<Long> failedIds;
/**
* Creates a new batch completion result.
*
* @param success indicates whether the completion is successful.
* @param failures set of any failures encountered
* @param failedIds (optional) set of failed operation ids
*/
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures,
Set<Long> failedIds) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = ImmutableSet.copyOf(failedIds);
}
/**
* Creates a new batch completion result.
......@@ -36,8 +54,11 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
public CompletedBatchOperation(boolean success, Set<? extends FlowRule> failures) {
this.success = success;
this.failures = ImmutableSet.copyOf(failures);
this.failedIds = Collections.emptySet();
}
@Override
public boolean isSuccess() {
return success;
......@@ -48,4 +69,8 @@ public class CompletedBatchOperation implements BatchOperationResult<FlowRule> {
return failures;
}
public Set<Long> failedIds() {
return failedIds;
}
}
......
......@@ -21,8 +21,20 @@ import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
public class FlowRuleBatchEntry
extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
private final Long id; // FIXME: consider using Optional<Long>
public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
super(operator, target);
this.id = null;
}
public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target, Long id) {
super(operator, target);
this.id = id;
}
public Long id() {
return id;
}
public enum FlowRuleOperation {
......
......@@ -18,38 +18,52 @@ package org.onlab.onos.net.flow;
import java.util.Collections;
import java.util.List;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
public class FlowRuleBatchRequest {
private final int batchId;
private final List<FlowRule> toAdd;
private final List<FlowRule> toRemove;
private final List<FlowRuleBatchEntry> toAdd;
private final List<FlowRuleBatchEntry> toRemove;
public FlowRuleBatchRequest(int batchId, List<? extends FlowRule> toAdd, List<? extends FlowRule> toRemove) {
public FlowRuleBatchRequest(int batchId, List<FlowRuleBatchEntry> toAdd,
List<FlowRuleBatchEntry> toRemove) {
this.batchId = batchId;
this.toAdd = Collections.unmodifiableList(toAdd);
this.toRemove = Collections.unmodifiableList(toRemove);
}
public List<FlowRule> toAdd() {
return toAdd;
return FluentIterable.from(toAdd).transform(
new Function<FlowRuleBatchEntry, FlowRule>() {
@Override
public FlowRule apply(FlowRuleBatchEntry input) {
return input.getTarget();
}
}).toList();
}
public List<FlowRule> toRemove() {
return toRemove;
return FluentIterable.from(toRemove).transform(
new Function<FlowRuleBatchEntry, FlowRule>() {
@Override
public FlowRule apply(FlowRuleBatchEntry input) {
return input.getTarget();
}
}).toList();
}
public FlowRuleBatchOperation asBatchOperation() {
List<FlowRuleBatchEntry> entries = Lists.newArrayList();
for (FlowRule e : toAdd) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, e));
}
for (FlowRule e : toRemove) {
entries.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, e));
}
entries.addAll(toAdd);
entries.addAll(toRemove);
return new FlowRuleBatchOperation(entries);
}
......
......@@ -37,10 +37,16 @@ public interface IntentBatchService {
void removeIntentOperations(IntentOperations operations);
/**
* Returns the set of intent batches currently being tracked.
* Returns the set of intent batches that are pending.
* @return set of batches
*/
Set<IntentOperations> getIntentOperations();
Set<IntentOperations> getPendingOperations();
/**
* Returns the set of intent batches currently being processed.
* @return set of batches
*/
Set<IntentOperations> getCurrentOperations();
/**
* Sets the batch service delegate.
......
......@@ -67,6 +67,18 @@ public enum IntentState {
RECOMPILING,
/**
* TODO: Indicated that an intent will soon be recompiled.
*/
//UPDATE,
/**
* TODO.
* Indicates that an application has requested that an intent be withdrawn.
* It will start withdrawing short, but not necessarily on this instance.
*/
//WITHDRAW_REQ,
/**
* Indicates that the intent is being withdrawn. This is a transitional
* state, triggered by invocation of the
* {@link IntentService#withdraw(Intent)} but one with only one outcome,
......
......@@ -21,6 +21,8 @@ import java.util.List;
import org.junit.Test;
import org.onlab.onos.net.intent.IntentTestsMocks;
import static org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
......@@ -38,10 +40,10 @@ public class FlowRuleBatchRequestTest {
public void testConstruction() {
final FlowRule rule1 = new IntentTestsMocks.MockFlowRule(1);
final FlowRule rule2 = new IntentTestsMocks.MockFlowRule(2);
final List<FlowRule> toAdd = new LinkedList<>();
toAdd.add(rule1);
final List<FlowRule> toRemove = new LinkedList<>();
toRemove.add(rule2);
final List<FlowRuleBatchEntry> toAdd = new LinkedList<>();
toAdd.add(new FlowRuleBatchEntry(ADD, rule1));
final List<FlowRuleBatchEntry> toRemove = new LinkedList<>();
toRemove.add(new FlowRuleBatchEntry(REMOVE, rule2));
final FlowRuleBatchRequest request =
......
......@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.onlab.onos.net.DeviceId;
......@@ -331,7 +332,22 @@ public class IntentTestsMocks {
return false;
}
@Override
public int hashCode() {
return Objects.hash(priority);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final MockFlowRule other = (MockFlowRule) obj;
return Objects.equals(this.priority, other.priority);
}
}
......
......@@ -488,13 +488,14 @@ public class FlowRuleManager
boolean success = true;
Set<FlowRule> failed = Sets.newHashSet();
Set<Long> failedIds = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
success = validateBatchOperation(failed, completed);
success = validateBatchOperation(failed, failedIds, completed);
}
return finalizeBatchOperation(success, failed);
return finalizeBatchOperation(success, failed, failedIds);
}
......@@ -508,22 +509,25 @@ public class FlowRuleManager
}
boolean success = true;
Set<FlowRule> failed = Sets.newHashSet();
Set<Long> failedIds = Sets.newHashSet();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get(timeout, unit);
success = validateBatchOperation(failed, completed);
success = validateBatchOperation(failed, failedIds, completed);
}
return finalizeBatchOperation(success, failed);
return finalizeBatchOperation(success, failed, failedIds);
}
private boolean validateBatchOperation(Set<FlowRule> failed,
CompletedBatchOperation completed) {
Set<Long> failedIds,
CompletedBatchOperation completed) {
if (isCancelled()) {
throw new CancellationException();
}
if (!completed.isSuccess()) {
failed.addAll(completed.failedItems());
failedIds.addAll(completed.failedIds());
cleanUpBatch();
cancelAllSubBatches();
return false;
......@@ -538,7 +542,8 @@ public class FlowRuleManager
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
Set<FlowRule> failed) {
Set<FlowRule> failed,
Set<Long> failedIds) {
synchronized (this) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
......@@ -546,7 +551,7 @@ public class FlowRuleManager
}
throw new CancellationException();
}
overall = new CompletedBatchOperation(success, failed);
overall = new CompletedBatchOperation(success, failed, failedIds);
return overall;
}
}
......
......@@ -49,9 +49,9 @@ import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
......@@ -330,12 +330,12 @@ public class IntentManager
batches.addAll(getInstaller(installable).install(installable));
} catch (IntentException e) {
log.warn("Unable to install intent {} due to:", update.newIntent().id(), e);
trackerService.removeTrackedResources(update.newIntent().id(),
installable.resources());
//FIXME we failed... intent should be recompiled
// TODO: remove resources
// recompile!!!
}
}
update.setBatches(batches);
update.addBatches(batches);
}
/**
......@@ -348,80 +348,32 @@ public class IntentManager
if (!update.oldIntent().equals(update.newIntent())) {
update.setState(update.oldIntent(), WITHDRAWING);
} // else newIntent is FAILED
uninstallIntent(update);
// If all went well, disassociate the top-level intent with its
// installable derivatives and mark it as withdrawn.
// FIXME need to clean up
//store.removeInstalledIntents(intent.id());
update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables()));
}
/**
* Uninstalls all installable intents associated with the given intent.
*
* @param update intent update
* @param intent intent
* @param installables installable intents
* @return list of batches to uninstall intent
*/
//FIXME: need to handle next state properly
private void uninstallIntent(IntentUpdate update) {
if (update.oldInstallables == null) {
return;
private List<FlowRuleBatchOperation> uninstallIntent(Intent intent, List<Intent> installables) {
if (installables == null) {
return Collections.emptyList();
}
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (Intent installable : update.oldInstallables()) {
trackerService.removeTrackedResources(update.oldIntent().id(),
for (Intent installable : installables) {
trackerService.removeTrackedResources(intent.id(),
installable.resources());
try {
batches.addAll(getInstaller(installable).uninstall(installable));
} catch (IntentException e) {
log.warn("Unable to uninstall intent {} due to:", update.oldIntent().id(), e);
log.warn("Unable to uninstall intent {} due to:", intent.id(), e);
// TODO: this should never happen. but what if it does?
}
}
update.setBatches(batches);
// FIXME: next state for old is WITHDRAWN or FAILED
}
/**
* Recompiles the specified intent.
*
* @param update intent update
*/
// FIXME: update this to work
private void executeRecompilingPhase(IntentUpdate update) {
Intent intent = update.newIntent();
// Indicate that the intent is entering the recompiling phase.
store.setState(intent, RECOMPILING);
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
try {
// Compile the intent into installable derivatives.
List<Intent> installable = compileIntent(intent, update);
// If all went well, compare the existing list of installable
// intents with the newly compiled list. If they are the same,
// bail, out since the previous approach was determined not to
// be viable.
// FIXME do we need this?
List<Intent> originalInstallable = store.getInstallableIntents(intent.id());
//FIXME let's be smarter about how we perform the update
//batches.addAll(uninstallIntent(intent, null));
if (Objects.equals(originalInstallable, installable)) {
eventDispatcher.post(store.setState(intent, FAILED));
} else {
// Otherwise, re-associate the newly compiled installable intents
// with the top-level intent and kick off installing phase.
store.setInstallableIntents(intent.id(), installable);
// FIXME commented out for now
//batches.addAll(executeInstallingPhase(update));
}
} catch (Exception e) {
log.warn("Unable to recompile intent {} due to:", intent.id(), e);
// If compilation failed, mark the intent as failed.
eventDispatcher.post(store.setState(intent, FAILED));
}
return batches;
}
/**
......@@ -442,9 +394,10 @@ public class IntentManager
for (int i = 0; i < update.oldInstallables().size(); i++) {
Intent oldInstallable = update.oldInstallables().get(i);
Intent newInstallable = update.newInstallables().get(i);
if (oldInstallable.equals(newInstallable)) {
continue;
}
//FIXME revisit this
// if (oldInstallable.equals(newInstallable)) {
// continue;
// }
checkArgument(oldInstallable.getClass().equals(newInstallable.getClass()),
"Installable Intent type mismatch.");
trackerService.removeTrackedResources(update.oldIntent().id(), oldInstallable.resources());
......@@ -454,9 +407,12 @@ public class IntentManager
} catch (IntentException e) {
log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e);
//FIXME... we failed. need to uninstall (if same) or revert (if different)
trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources());
update.setState(update.newIntent(), FAILED);
batches = uninstallIntent(update.oldIntent(), update.oldInstallables());
}
}
update.setBatches(batches);
update.addBatches(batches);
}
/**
......@@ -541,13 +497,12 @@ public class IntentManager
}
/**
* TODO.
* @param op intent operation
* @return intent update
* TODO. rename this...
* @param update intent update
*/
private IntentUpdate processIntentOperation(IntentOperation op) {
IntentUpdate update = new IntentUpdate(op);
private void processIntentUpdate(IntentUpdate update) {
// check to see if the intent needs to be compiled or recompiled
if (update.newIntent() != null) {
executeCompilingPhase(update);
}
......@@ -559,32 +514,29 @@ public class IntentManager
} else if (update.oldInstallables() != null) {
executeWithdrawingPhase(update);
} else {
if (update.oldIntent() != null) {
// TODO this shouldn't happen
return update; //FIXME
}
if (update.newIntent() != null) {
// TODO assert that next state is failed
return update; //FIXME
if (update.oldIntent() != null &&
!update.oldIntent().equals(update.newIntent())) {
// removing failed intent
update.setState(update.oldIntent(), WITHDRAWING);
}
// if (update.newIntent() != null) {
// // TODO assert that next state is failed
// }
}
return update;
}
// TODO comments...
private class IntentUpdate {
private final IntentOperation op;
private final Intent oldIntent;
private final Intent newIntent;
private final Map<Intent, IntentState> stateMap = Maps.newHashMap();
private final List<Intent> oldInstallables;
private List<Intent> newInstallables;
private List<FlowRuleBatchOperation> batches;
private final List<FlowRuleBatchOperation> batches = Lists.newLinkedList();
private int currentBatch = 0; // TODO: maybe replace with an iterator
IntentUpdate(IntentOperation op) {
this.op = op;
switch (op.type()) {
case SUBMIT:
newIntent = op.intent();
......@@ -600,7 +552,7 @@ public class IntentManager
break;
case UPDATE:
oldIntent = store.getIntent(op.intentId());
newIntent = oldIntent; //InnerAssignment: Inner assignments should be avoided.
newIntent = oldIntent;
break;
default:
oldIntent = null;
......@@ -617,7 +569,6 @@ public class IntentManager
// fetch the old intent's installables from the store
if (oldIntent != null) {
oldInstallables = store.getInstallableIntents(oldIntent.id());
// TODO: remove intent from store after uninstall
} else {
oldInstallables = null;
}
......@@ -644,12 +595,72 @@ public class IntentManager
store.setInstallableIntents(newIntent.id(), installables);
}
boolean isComplete() {
return currentBatch >= batches.size();
}
FlowRuleBatchOperation currentBatch() {
return !isComplete() ? batches.get(currentBatch) : null;
}
void incrementBatch(boolean success) {
if (success) { // actually increment
if (++currentBatch == batches.size()) {
finalizeStates();
}
} else { // the current batch has failed, so recompile
// remove the current batch and all remaining
for (int i = currentBatch; i < batches.size(); i++) {
batches.remove(i);
}
if (oldIntent != null) {
executeWithdrawingPhase(this); // remove the old intent
}
if (newIntent != null) {
setState(newIntent, FAILED);
batches.addAll(uninstallIntent(newIntent, newInstallables()));
}
// FIXME: should we try to recompile?
}
}
// FIXME make sure this is called!!!
private void finalizeStates() {
for (Intent intent : stateMap.keySet()) {
switch (getState(intent)) {
case INSTALLING:
setState(intent, INSTALLED);
break;
case WITHDRAWING:
setState(intent, WITHDRAWN);
store.removeInstalledIntents(intent.id());
//store.removeIntent(intent.id()); // FIXME we die a horrible death here
break;
case FAILED:
store.removeInstalledIntents(intent.id());
break;
// FALLTHROUGH to default from here
case SUBMITTED:
case COMPILING:
case RECOMPILING:
case WITHDRAWN:
case INSTALLED:
default:
//FIXME clean this up (we shouldn't ever get here)
log.warn("Bad state: {} for {}", getState(intent), intent);
break;
}
}
}
List<FlowRuleBatchOperation> batches() {
return batches;
}
void setBatches(List<FlowRuleBatchOperation> batches) {
this.batches = batches;
void addBatches(List<FlowRuleBatchOperation> batches) {
this.batches.addAll(batches);
}
IntentState getState(Intent intent) {
......@@ -659,7 +670,7 @@ public class IntentManager
void setState(Intent intent, IntentState newState) {
// TODO: clean this up, or set to debug
IntentState oldState = stateMap.get(intent);
log.info("intent id: {}, old state: {}, new state: {}",
log.debug("intent id: {}, old state: {}, new state: {}",
intent.id(), oldState, newState);
stateMap.put(intent, newState);
......@@ -674,143 +685,72 @@ public class IntentManager
}
}
private static List<FlowRuleBatchOperation> mergeBatches(Map<IntentOperation,
IntentUpdate> intentUpdates) {
//TODO test this.
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (IntentUpdate update : intentUpdates.values()) {
if (update.batches() == null) {
continue;
}
int i = 0;
for (FlowRuleBatchOperation batch : update.batches()) {
if (i == batches.size()) {
batches.add(batch);
} else {
FlowRuleBatchOperation existing = batches.get(i);
existing.addAll(batch);
}
i++;
}
}
return batches;
}
// Auxiliary runnable to perform asynchronous tasks.
private class IntentTask implements Runnable {
private final IntentOperations operations;
public IntentTask(IntentOperations operations) {
this.operations = operations;
}
@Override
public void run() {
Map<IntentOperation, IntentUpdate> intentUpdates = Maps.newHashMap();
for (IntentOperation op : operations.operations()) {
intentUpdates.put(op, processIntentOperation(op));
}
List<FlowRuleBatchOperation> batches = mergeBatches(intentUpdates);
monitorExecutor.execute(new IntentInstallMonitor(operations, intentUpdates, batches));
}
}
private class IntentInstallMonitor implements Runnable {
private static final long TIMEOUT = 5000; // ms
private static final int MAX_ATTEMPTS = 3;
private final IntentOperations ops;
private final Map<IntentOperation, IntentUpdate> intentUpdateMap;
private final List<FlowRuleBatchOperation> work;
private final List<IntentUpdate> intentUpdates = Lists.newArrayList();
private Future<CompletedBatchOperation> future;
private final long startTime = System.currentTimeMillis();
private final long endTime = startTime + TIMEOUT;
private long startTime = System.currentTimeMillis();
private long endTime = startTime + TIMEOUT;
private int installAttempt;
public IntentInstallMonitor(IntentOperations ops,
Map<IntentOperation, IntentUpdate> intentUpdateMap,
List<FlowRuleBatchOperation> work) {
public IntentInstallMonitor(IntentOperations ops) {
this.ops = ops;
this.intentUpdateMap = intentUpdateMap;
this.work = work;
}
private void buildIntentUpdates() {
for (IntentOperation op : ops.operations()) {
IntentUpdate update = new IntentUpdate(op);
intentUpdates.add(update);
processIntentUpdate(update);
}
future = applyNextBatch();
}
/**
* Applies the next batch, and returns the future.
* Builds and applies the next batch, and returns the future.
*
* @return Future for next batch
*/
private Future<CompletedBatchOperation> applyNextBatch() {
if (work.isEmpty()) {
return null;
//TODO test this. (also, maybe save this batch)
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
for (IntentUpdate update : intentUpdates) {
if (!update.isComplete()) {
batch.addAll(update.currentBatch());
}
}
FlowRuleBatchOperation batch = work.remove(0);
return flowRuleService.applyBatch(batch);
return (batch.size() > 0) ? flowRuleService.applyBatch(batch) : null;
}
/**
* Update the intent store with the next status for this intent.
*/
private void updateIntents() {
// FIXME we assume everything passes for now.
for (IntentUpdate update : intentUpdateMap.values()) {
for (Intent intent : update.stateMap().keySet()) {
switch (update.getState(intent)) {
case INSTALLING:
update.setState(intent, INSTALLED);
break;
case WITHDRAWING:
update.setState(intent, WITHDRAWN);
// Fall-through
case FAILED:
store.removeInstalledIntents(intent.id());
break;
case SUBMITTED:
case COMPILING:
case RECOMPILING:
case WITHDRAWN:
case INSTALLED:
default:
//FIXME clean this up (we shouldn't ever get here)
log.warn("Bad state: {} for {}", update.getState(intent), intent);
break;
}
private void updateBatches(CompletedBatchOperation completed) {
if (completed.isSuccess()) {
for (IntentUpdate update : intentUpdates) {
update.incrementBatch(true);
}
}
/*
for (IntentOperation op : ops.operations()) {
switch (op.type()) {
case SUBMIT:
store.setState(op.intent(), INSTALLED);
break;
case WITHDRAW:
Intent intent = store.getIntent(op.intentId());
store.setState(intent, WITHDRAWN);
break;
case REPLACE:
store.setState(op.intent(), INSTALLED);
intent = store.getIntent(op.intentId());
store.setState(intent, WITHDRAWN);
break;
case UPDATE:
intent = store.getIntent(op.intentId());
store.setState(intent, INSTALLED);
break;
default:
break;
} else {
// entire batch has been reverted...
log.warn("Failed items: {}", completed.failedItems());
for (Long id : completed.failedIds()) {
IntentId targetId = IntentId.valueOf(id);
for (IntentUpdate update : intentUpdates) {
List<Intent> installables = Lists.newArrayList(update.newInstallables());
installables.addAll(update.oldInstallables());
for (Intent intent : installables) {
if (intent.id().equals(targetId)) {
update.incrementBatch(false);
break;
}
}
}
// don't increment the non-failed items, as they have been reverted.
}
}
*/
/*
if (nextState == RECOMPILING) {
eventDispatcher.post(store.setState(intent, FAILED));
// FIXME try to recompile
// executor.execute(new IntentTask(nextState, intent));
} else if (nextState == INSTALLED || nextState == WITHDRAWN) {
eventDispatcher.post(store.setState(intent, nextState));
} else {
log.warn("Invalid next intent state {} for intent {}", nextState, intent);
}*/
}
/**
......@@ -822,34 +762,60 @@ public class IntentManager
}
try {
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
if (completed.isSuccess()) {
future = applyNextBatch();
} else {
// TODO check if future succeeded and if not report fail items
log.warn("Failed items: {}", completed.failedItems());
// FIXME revert.... by submitting a new batch
//uninstallIntent(intent, RECOMPILING);
}
updateBatches(completed);
future = applyNextBatch();
} catch (TimeoutException | InterruptedException | ExecutionException te) {
//TODO look into error message
log.debug("Intallations of intent {} is still pending", ops);
log.debug("Installation of intents are still pending: {}", ops);
}
}
private void retry() {
if (future.cancel(true)) { // cancel success; batch is reverted
// reset the timer
endTime = System.currentTimeMillis() + TIMEOUT;
if (installAttempt++ >= MAX_ATTEMPTS) {
log.warn("Install request timed out: {}", ops);
for (IntentUpdate update : intentUpdates) {
update.incrementBatch(false);
}
} // else just resubmit the work
future = applyNextBatch();
monitorExecutor.submit(this);
} else {
// FIXME
// cancel failed... batch is broken; shouldn't happen!
// we could manually reverse everything
// ... or just core dump and send email to Ali
batchService.removeIntentOperations(ops);
}
}
boolean isComplete() {
// TODO: actually check with the intent update?
return future == null;
}
@Override
public void run() {
processFutures();
if (future == null) {
// woohoo! we are done!
updateIntents();
batchService.removeIntentOperations(ops);
} else if (endTime < System.currentTimeMillis()) {
log.warn("Install request timed out");
// future.cancel(true);
// TODO retry and/or report the failure
} else {
// resubmit ourselves if we are not done yet
monitorExecutor.submit(this);
try {
if (intentUpdates.isEmpty()) {
// this should only be called on the first iteration
// note: this a "expensive", so it is not done in the constructor
buildIntentUpdates();
}
processFutures();
if (isComplete()) {
// there are no outstanding batches; we are done
batchService.removeIntentOperations(ops);
} else if (endTime < System.currentTimeMillis()) {
retry();
} else {
// we are not done yet, yield the thread by resubmitting ourselves
monitorExecutor.submit(this);
}
} catch (Exception e) {
log.error("Error submitting batches:", e);
}
}
}
......@@ -859,7 +825,7 @@ public class IntentManager
public void execute(IntentOperations operations) {
log.info("Execute operations: {}", operations);
//FIXME: perhaps we want to track this task so that we can cancel it.
executor.execute(new IntentTask(operations));
monitorExecutor.execute(new IntentInstallMonitor(operations));
}
@Override
......
......@@ -101,7 +101,8 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment, 123, //FIXME 123
appId, (short) (intent.id().fingerprint() & 0xffff), 0, true);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule,
intent.id().fingerprint()));
prev = link.dst();
}
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
......@@ -127,7 +128,8 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, (short) (intent.id().fingerprint() & 0xffff), 0, true);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule,
intent.id().fingerprint()));
prev = link.dst();
}
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
......
package org.onlab.onos.net.intent.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.onos.TestApplicationId;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.event.impl.TestEventDispatcher;
import org.onlab.onos.net.NetworkResource;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentCompiler;
import org.onlab.onos.net.intent.IntentEvent;
import org.onlab.onos.net.intent.IntentEvent.Type;
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.IntentListener;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentTestsMocks;
import org.onlab.onos.net.resource.LinkResourceAllocations;
import org.onlab.onos.store.trivial.impl.SimpleIntentBatchQueue;
import org.onlab.onos.store.trivial.impl.SimpleIntentStore;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.onlab.onos.net.intent.IntentState.*;
import static org.onlab.util.Tools.delay;
/**
* Test intent manager and transitions.
*
* TODO implement the following tests:
* - {submit, withdraw, update, replace} intent
* - {submit, update, recomiling} intent with failed compilation
* - failed reservation
* - push timeout recovery
* - failed items recovery
*
* in general, verify intents store, flow store, and work queue
*/
public class IntentManagerTest {
private static final ApplicationId APPID = new TestApplicationId("manager-test");
private IntentManager manager;
private MockFlowRuleService flowRuleService;
protected IntentService service;
protected IntentExtensionService extensionService;
protected TestListener listener = new TestListener();
protected TestIntentCompiler compiler = new TestIntentCompiler();
protected TestIntentInstaller installer = new TestIntentInstaller();
@Before
public void setUp() {
manager = new IntentManager();
flowRuleService = new MockFlowRuleService();
manager.store = new SimpleIntentStore();
manager.batchService = new SimpleIntentBatchQueue();
manager.eventDispatcher = new TestEventDispatcher();
manager.trackerService = new TestIntentTracker();
manager.flowRuleService = flowRuleService;
service = manager;
extensionService = manager;
manager.activate();
service.addListener(listener);
extensionService.registerCompiler(MockIntent.class, compiler);
extensionService.registerInstaller(MockInstallableIntent.class, installer);
assertTrue("store should be empty",
Sets.newHashSet(service.getIntents()).isEmpty());
assertEquals(0L, flowRuleService.getFlowRuleCount());
}
@After
public void tearDown() {
// verify that all intents are parked and the batch operation is unblocked
Set<IntentState> parked = Sets.newHashSet(INSTALLED, WITHDRAWN, FAILED);
for (Intent i : service.getIntents()) {
IntentState state = service.getIntentState(i.id());
assertTrue("Intent " + i.id() + " is in invalid state " + state,
parked.contains(state));
}
//the batch has not yet been removed when we receive the last event
// FIXME: this doesn't guarantee to avoid the race
for (int tries = 0; tries < 10; tries++) {
if (manager.batchService.getPendingOperations().isEmpty() &&
manager.batchService.getCurrentOperations().isEmpty()) {
break;
}
delay(10);
}
assertTrue("There are still pending batch operations.",
manager.batchService.getPendingOperations().isEmpty());
assertTrue("There are still outstanding batch operations.",
manager.batchService.getCurrentOperations().isEmpty());
extensionService.unregisterCompiler(MockIntent.class);
extensionService.unregisterInstaller(MockInstallableIntent.class);
service.removeListener(listener);
manager.deactivate();
// TODO null the other refs?
}
@Test
public void submitIntent() {
flowRuleService.setFuture(true);
listener.setLatch(1, Type.SUBMITTED);
listener.setLatch(1, Type.INSTALLED);
Intent intent = new MockIntent(MockIntent.nextId());
service.submit(intent);
listener.await(Type.SUBMITTED);
listener.await(Type.INSTALLED);
assertEquals(1L, service.getIntentCount());
assertEquals(1L, flowRuleService.getFlowRuleCount());
}
@Test
public void withdrawIntent() {
flowRuleService.setFuture(true);
listener.setLatch(1, Type.INSTALLED);
Intent intent = new MockIntent(MockIntent.nextId());
service.submit(intent);
listener.await(Type.INSTALLED);
assertEquals(1L, service.getIntentCount());
assertEquals(1L, flowRuleService.getFlowRuleCount());
listener.setLatch(1, Type.WITHDRAWN);
service.withdraw(intent);
listener.await(Type.WITHDRAWN);
assertEquals(1L, service.getIntentCount());
assertEquals(0L, flowRuleService.getFlowRuleCount());
}
@Test
public void stressSubmitWithdraw() {
flowRuleService.setFuture(true);
int count = 500;
listener.setLatch(count, Type.INSTALLED);
listener.setLatch(count, Type.WITHDRAWN);
Intent intent = new MockIntent(MockIntent.nextId());
for (int i = 0; i < count; i++) {
service.submit(intent);
service.withdraw(intent);
}
listener.await(Type.INSTALLED);
listener.await(Type.WITHDRAWN);
assertEquals(1L, service.getIntentCount());
assertEquals(0L, flowRuleService.getFlowRuleCount());
}
@Test
public void replaceIntent() {
flowRuleService.setFuture(true);
MockIntent intent = new MockIntent(MockIntent.nextId());
listener.setLatch(1, Type.INSTALLED);
service.submit(intent);
listener.await(Type.INSTALLED);
assertEquals(1L, service.getIntentCount());
assertEquals(1L, manager.flowRuleService.getFlowRuleCount());
MockIntent intent2 = new MockIntent(MockIntent.nextId());
listener.setLatch(1, Type.WITHDRAWN);
listener.setLatch(1, Type.SUBMITTED);
listener.setLatch(1, Type.INSTALLED);
service.replace(intent.id(), intent2);
listener.await(Type.WITHDRAWN);
listener.await(Type.INSTALLED);
assertEquals(2L, service.getIntentCount());
assertEquals(1L, manager.flowRuleService.getFlowRuleCount());
assertEquals(intent2.number().intValue(),
flowRuleService.flows.iterator().next().priority());
}
private static class TestListener implements IntentListener {
final Multimap<IntentEvent.Type, IntentEvent> events = HashMultimap.create();
Map<IntentEvent.Type, CountDownLatch> latchMap = Maps.newHashMap();
@Override
public void event(IntentEvent event) {
events.put(event.type(), event);
if (latchMap.containsKey(event.type())) {
latchMap.get(event.type()).countDown();
}
}
public int getCounts(IntentEvent.Type type) {
return events.get(type).size();
}
public void setLatch(int count, IntentEvent.Type type) {
latchMap.put(type, new CountDownLatch(count));
}
public void await(IntentEvent.Type type) {
try {
latchMap.get(type).await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class TestIntentTracker implements ObjectiveTrackerService {
private TopologyChangeDelegate delegate;
@Override
public void setDelegate(TopologyChangeDelegate delegate) {
this.delegate = delegate;
}
@Override
public void unsetDelegate(TopologyChangeDelegate delegate) {
if (delegate.equals(this.delegate)) {
this.delegate = null;
}
}
@Override
public void addTrackedResources(IntentId intentId, Collection<NetworkResource> resources) {
//TODO
}
@Override
public void removeTrackedResources(IntentId intentId, Collection<NetworkResource> resources) {
//TODO
}
}
private static class MockIntent extends Intent {
private static AtomicLong counter = new AtomicLong(0);
private final Long number;
// Nothing new here
public MockIntent(Long number) {
super(id(MockIntent.class, number), APPID, null);
this.number = number;
}
public Long number() {
return number;
}
public static Long nextId() {
return counter.getAndIncrement();
}
}
private static class MockInstallableIntent extends MockIntent {
public MockInstallableIntent(Long number) {
super(number);
}
@Override
public boolean isInstallable() {
return true;
}
}
private static class TestIntentCompiler implements IntentCompiler<MockIntent> {
@Override
public List<Intent> compile(MockIntent intent, List<Intent> installable,
Set<LinkResourceAllocations> resources) {
return Lists.newArrayList(new MockInstallableIntent(intent.number()));
}
}
private static class TestIntentInstaller implements IntentInstaller<MockInstallableIntent> {
@Override
public List<FlowRuleBatchOperation> install(MockInstallableIntent intent) {
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
@Override
public List<FlowRuleBatchOperation> uninstall(MockInstallableIntent intent) {
FlowRule fr = new IntentTestsMocks.MockFlowRule(intent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
@Override
public List<FlowRuleBatchOperation> replace(MockInstallableIntent oldIntent, MockInstallableIntent newIntent) {
FlowRule fr = new IntentTestsMocks.MockFlowRule(oldIntent.number().intValue());
FlowRule fr2 = new IntentTestsMocks.MockFlowRule(newIntent.number().intValue());
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, fr));
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, fr2));
return Lists.newArrayList(new FlowRuleBatchOperation(rules));
}
}
/**
* Hamcrest matcher to check that a conllection of Intents contains an
* Intent with the specified Intent Id.
*/
public static class EntryForIntentMatcher extends TypeSafeMatcher<Collection<Intent>> {
private final String id;
public EntryForIntentMatcher(String idValue) {
id = idValue;
}
@Override
public boolean matchesSafely(Collection<Intent> intents) {
return hasItem(Matchers.<Intent>hasProperty("id", equalTo(id))).matches(intents);
}
@Override
public void describeTo(Description description) {
description.appendText("an intent with id \" ").
appendText(id).
appendText("\"");
}
}
}
package org.onlab.onos.net.intent.impl;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleService;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Future;
public class MockFlowRuleService implements FlowRuleService {
private Future<CompletedBatchOperation> future;
final Set<FlowRule> flows = Sets.newHashSet();
public void setFuture(boolean success) {
future = Futures.immediateFuture(new CompletedBatchOperation(true, Collections.emptySet()));
}
@Override
public Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch) {
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule fr = fbe.getTarget();
switch (fbe.getOperator()) {
case ADD:
flows.add(fr);
break;
case REMOVE:
flows.remove(fr);
break;
case MODIFY:
break;
default:
break;
}
}
return future;
}
@Override
public int getFlowRuleCount() {
return flows.size();
}
@Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
return null;
}
@Override
public void applyFlowRules(FlowRule... flowRules) {
}
@Override
public void removeFlowRules(FlowRule... flowRules) {
}
@Override
public void removeFlowRulesById(ApplicationId appId) {
}
@Override
public Iterable<FlowRule> getFlowRulesById(ApplicationId id) {
return null;
}
@Override
public Iterable<FlowRule> getFlowRulesByGroupId(ApplicationId appId, short groupId) {
return null;
}
@Override
public void addListener(FlowRuleListener listener) {
}
@Override
public void removeListener(FlowRuleListener listener) {
}
}
......@@ -340,7 +340,8 @@ public class DistributedFlowRuleStore
public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
if (operation.getOperations().isEmpty()) {
return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
return Futures.immediateFuture(new CompletedBatchOperation(true,
Collections.<FlowRule>emptySet()));
}
DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
......@@ -379,8 +380,8 @@ public class DistributedFlowRuleStore
private ListenableFuture<CompletedBatchOperation>
storeBatchInternal(FlowRuleBatchOperation operation) {
final List<StoredFlowEntry> toRemove = new ArrayList<>();
final List<StoredFlowEntry> toAdd = new ArrayList<>();
final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
DeviceId did = null;
......@@ -396,14 +397,14 @@ public class DistributedFlowRuleStore
StoredFlowEntry entry = getFlowEntryInternal(flowRule);
if (entry != null) {
entry.setState(FlowEntryState.PENDING_REMOVE);
toRemove.add(entry);
toRemove.add(batchEntry);
}
} else if (op.equals(FlowRuleOperation.ADD)) {
StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
DeviceId deviceId = flowRule.deviceId();
if (!flowEntries.containsEntry(deviceId, flowEntry)) {
flowEntries.put(deviceId, flowEntry);
toAdd.add(flowEntry);
toAdd.add(batchEntry);
}
}
}
......@@ -427,8 +428,8 @@ public class DistributedFlowRuleStore
}
private void updateBackup(final DeviceId deviceId,
final List<StoredFlowEntry> toAdd,
final List<? extends FlowRule> list) {
final List<FlowRuleBatchEntry> toAdd,
final List<FlowRuleBatchEntry> list) {
Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
......@@ -442,8 +443,9 @@ public class DistributedFlowRuleStore
}
}
private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
}
@Override
......@@ -477,8 +479,9 @@ public class DistributedFlowRuleStore
stored.setPackets(rule.packets());
if (stored.state() == FlowEntryState.PENDING_ADD) {
stored.setState(FlowEntryState.ADDED);
// update backup.
updateBackup(did, Arrays.asList(stored));
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
updateBackup(did, Arrays.asList(entry));
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
......@@ -515,7 +518,9 @@ public class DistributedFlowRuleStore
try {
// This is where one could mark a rule as removed and still keep it in the store.
final boolean removed = flowEntries.remove(deviceId, rule);
updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
FlowRuleBatchEntry entry =
new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
if (removed) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
......@@ -687,15 +692,17 @@ public class DistributedFlowRuleStore
}
// Task to update FlowEntries in backup HZ store
// TODO: Should be refactored to contain only one list and not
// toAdd and toRemove
private final class UpdateBackup implements Runnable {
private final DeviceId deviceId;
private final List<StoredFlowEntry> toAdd;
private final List<? extends FlowRule> toRemove;
private final List<FlowRuleBatchEntry> toAdd;
private final List<FlowRuleBatchEntry> toRemove;
public UpdateBackup(DeviceId deviceId,
List<StoredFlowEntry> toAdd,
List<? extends FlowRule> list) {
List<FlowRuleBatchEntry> toAdd,
List<FlowRuleBatchEntry> list) {
this.deviceId = checkNotNull(deviceId);
this.toAdd = checkNotNull(toAdd);
this.toRemove = checkNotNull(list);
......@@ -707,7 +714,8 @@ public class DistributedFlowRuleStore
log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
// Following should be rewritten using async APIs
for (StoredFlowEntry entry : toAdd) {
for (FlowRuleBatchEntry bEntry : toAdd) {
final FlowRule entry = bEntry.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
......@@ -715,8 +723,8 @@ public class DistributedFlowRuleStore
list.addAll(original);
}
list.remove(entry);
list.add(entry);
list.remove(bEntry.getTarget());
list.add((StoredFlowEntry) entry);
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
......@@ -730,7 +738,8 @@ public class DistributedFlowRuleStore
log.error("Updating backup failed.");
}
}
for (FlowRule entry : toRemove) {
for (FlowRuleBatchEntry bEntry : toRemove) {
final FlowRule entry = bEntry.getTarget();
final FlowId id = entry.id();
ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
List<StoredFlowEntry> list = new ArrayList<>();
......@@ -738,7 +747,7 @@ public class DistributedFlowRuleStore
list.addAll(original);
}
list.remove(entry);
list.remove(bEntry.getTarget());
ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
boolean success;
......
......@@ -25,7 +25,6 @@ import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
......@@ -82,10 +81,17 @@ public class DistributedIntentBatchQueue implements IntentBatchService {
}
@Override
public Set<IntentOperations> getIntentOperations() {
Set<IntentOperations> set = Sets.newHashSet(currentBatches);
set.addAll((Collection) pendingBatches);
return set;
public Set<IntentOperations> getPendingOperations() {
synchronized (this) {
return Sets.newHashSet(pendingBatches);
}
}
@Override
public Set<IntentOperations> getCurrentOperations() {
synchronized (this) {
return Sets.newHashSet(currentBatches);
}
}
@Override
......
......@@ -263,19 +263,19 @@ public class SimpleFlowRuleStore
@Override
public Future<CompletedBatchOperation> storeBatch(
FlowRuleBatchOperation batchOperation) {
List<FlowRule> toAdd = new ArrayList<>();
List<FlowRule> toRemove = new ArrayList<>();
List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
for (FlowRuleBatchEntry entry : batchOperation.getOperations()) {
final FlowRule flowRule = entry.getTarget();
if (entry.getOperator().equals(FlowRuleOperation.ADD)) {
if (!getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
storeFlowRule(flowRule);
toAdd.add(flowRule);
toAdd.add(entry);
}
} else if (entry.getOperator().equals(FlowRuleOperation.REMOVE)) {
if (getFlowEntries(flowRule.deviceId(), flowRule.id()).contains(flowRule)) {
deleteFlowRule(flowRule);
toRemove.add(flowRule);
toRemove.add(entry);
}
} else {
throw new UnsupportedOperationException("Unsupported operation type");
......
......@@ -15,7 +15,7 @@
*/
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -25,7 +25,8 @@ import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -37,7 +38,8 @@ import static org.slf4j.LoggerFactory.getLogger;
public class SimpleIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
private final Set<IntentOperations> pendingBatches = new HashSet<>();
private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
......@@ -53,18 +55,42 @@ public class SimpleIntentBatchQueue implements IntentBatchService {
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
pendingBatches.add(operations);
delegate.execute(operations);
synchronized (this) {
pendingBatches.add(operations);
if (currentBatches.isEmpty()) {
IntentOperations work = pendingBatches.poll();
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public void removeIntentOperations(IntentOperations operations) {
pendingBatches.remove(operations);
// we allow at most one outstanding batch at a time
synchronized (this) {
checkState(currentBatches.remove(operations), "Operations not found in current ops.");
checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
IntentOperations work = pendingBatches.poll();
if (work != null) {
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public Set<IntentOperations> getIntentOperations() {
return ImmutableSet.copyOf(pendingBatches);
public Set<IntentOperations> getPendingOperations() {
synchronized (this) {
return Sets.newHashSet(pendingBatches);
}
}
@Override
public Set<IntentOperations> getCurrentOperations() {
synchronized (this) {
return Sets.newHashSet(currentBatches);
}
}
@Override
......
......@@ -53,6 +53,8 @@ import org.projectfloodlight.openflow.types.VlanPcp;
import org.projectfloodlight.openflow.types.VlanVid;
import org.slf4j.Logger;
import java.util.Optional;
/**
* Builder for OpenFlow flow mods based on FlowRules.
*/
......@@ -63,6 +65,7 @@ public abstract class FlowModBuilder {
private final OFFactory factory;
private final FlowRule flowRule;
private final TrafficSelector selector;
protected final Long xid;
/**
* Creates a new flow mod builder.
......@@ -71,12 +74,13 @@ public abstract class FlowModBuilder {
* @param factory the OpenFlow factory to use to build the flow mod
* @return the new flow mod builder
*/
public static FlowModBuilder builder(FlowRule flowRule, OFFactory factory) {
public static FlowModBuilder builder(FlowRule flowRule,
OFFactory factory, Optional<Long> xid) {
switch (factory.getVersion()) {
case OF_10:
return new FlowModBuilderVer10(flowRule, factory);
return new FlowModBuilderVer10(flowRule, factory, xid);
case OF_13:
return new FlowModBuilderVer13(flowRule, factory);
return new FlowModBuilderVer13(flowRule, factory, xid);
default:
throw new UnsupportedOperationException(
"No flow mod builder for protocol version " + factory.getVersion());
......@@ -89,10 +93,12 @@ public abstract class FlowModBuilder {
* @param flowRule the flow rule to transform into a flow mod
* @param factory the OpenFlow factory to use to build the flow mod
*/
protected FlowModBuilder(FlowRule flowRule, OFFactory factory) {
protected FlowModBuilder(FlowRule flowRule, OFFactory factory, Optional<Long> xid) {
this.factory = factory;
this.flowRule = flowRule;
this.selector = flowRule.selector();
this.xid = xid.orElse((long) 0);
}
/**
......
......@@ -18,6 +18,7 @@ package org.onlab.onos.provider.of.flow.impl;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.TrafficTreatment;
......@@ -62,8 +63,9 @@ public class FlowModBuilderVer10 extends FlowModBuilder {
* @param flowRule the flow rule to transform into a flow mod
* @param factory the OpenFlow factory to use to build the flow mod
*/
protected FlowModBuilderVer10(FlowRule flowRule, OFFactory factory) {
super(flowRule, factory);
protected FlowModBuilderVer10(FlowRule flowRule,
OFFactory factory, Optional<Long> xid) {
super(flowRule, factory, xid);
this.treatment = flowRule.treatment();
}
......@@ -77,7 +79,7 @@ public class FlowModBuilderVer10 extends FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowAdd fm = factory().buildFlowAdd()
.setXid(cookie)
.setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -98,7 +100,7 @@ public class FlowModBuilderVer10 extends FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory().buildFlowModify()
.setXid(cookie)
.setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -118,7 +120,7 @@ public class FlowModBuilderVer10 extends FlowModBuilder {
long cookie = flowRule().id().value();
OFFlowDelete fm = factory().buildFlowDelete()
.setXid(cookie)
.setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......
......@@ -18,6 +18,7 @@ package org.onlab.onos.provider.of.flow.impl;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.TrafficTreatment;
......@@ -70,8 +71,8 @@ public class FlowModBuilderVer13 extends FlowModBuilder {
* @param flowRule the flow rule to transform into a flow mod
* @param factory the OpenFlow factory to use to build the flow mod
*/
protected FlowModBuilderVer13(FlowRule flowRule, OFFactory factory) {
super(flowRule, factory);
protected FlowModBuilderVer13(FlowRule flowRule, OFFactory factory, Optional<Long> xid) {
super(flowRule, factory, xid);
this.treatment = flowRule.treatment();
}
......@@ -93,7 +94,7 @@ public class FlowModBuilderVer13 extends FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowAdd fm = factory().buildFlowAdd()
.setXid(cookie)
.setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -117,7 +118,7 @@ public class FlowModBuilderVer13 extends FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory().buildFlowModify()
.setXid(cookie)
.setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -140,7 +141,7 @@ public class FlowModBuilderVer13 extends FlowModBuilder {
long cookie = flowRule().id().value();
OFFlowDelete fm = factory().buildFlowDelete()
.setXid(cookie)
.setXid(xid)
.setCookie(U64.of(cookie))
.setBufferId(OFBufferId.NO_BUFFER)
//.setActions(actions) //FIXME do we want to send actions in flowdel?
......
......@@ -19,7 +19,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -76,6 +75,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
......@@ -122,7 +122,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
private final AtomicLong xidCounter = new AtomicLong(0);
private final AtomicLong xidCounter = new AtomicLong(1);
/**
* Creates an OpenFlow host provider.
......@@ -164,7 +164,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory()).buildFlowAdd());
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty()).buildFlowAdd());
}
......@@ -178,7 +179,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void removeRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory()).buildFlowDel());
sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
Optional.empty()).buildFlowDel());
}
@Override
......@@ -211,7 +213,10 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
return failed;
}
sws.add(new Dpid(sw.getId()));
FlowModBuilder builder = FlowModBuilder.builder(flowRule, sw.factory());
Long flowModXid = xidCounter.getAndIncrement();
FlowModBuilder builder =
FlowModBuilder.builder(flowRule, sw.factory(),
Optional.of(flowModXid));
OFFlowMod mod = null;
switch (fbe.getOperator()) {
case ADD:
......@@ -228,7 +233,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
if (mod != null) {
mods.put(mod, sw);
fmXids.put(xidCounter.getAndIncrement(), fbe);
fmXids.put(flowModXid, fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
......@@ -237,6 +242,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
pendingFutures.put(installation.xid(), installation);
for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
OpenFlowSwitch sw = entry.getValue();
......@@ -368,13 +374,13 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
private Long failedId;
private final CountDownLatch countDownLatch;
private BatchState state;
private final ExecutionList executionList = new ExecutionList();
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.xid = xidCounter.getAndIncrement();
this.state = BatchState.STARTED;
......@@ -393,6 +399,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
removeRequirement(dpid);
FlowEntry fe = null;
FlowRuleBatchEntry fbe = fms.get(msg.getXid());
failedId = fbe.id();
FlowRule offending = fbe.getTarget();
//TODO handle specific error msgs
switch (msg.getErrType()) {
......@@ -492,8 +499,11 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
this.state = BatchState.FINISHED;
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
//FIXME do cleanup here
Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
CompletedBatchOperation result =
new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
//FIXME do cleanup here (moved by BOC)
cleanUp();
return result;
}
......@@ -503,8 +513,11 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
TimeoutException {
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
// FIXME do cleanup here
Set<Long> failedIds = (failedId != null) ? Sets.newHashSet(failedId) : Collections.emptySet();
CompletedBatchOperation result =
new CompletedBatchOperation(ok.get(), offendingFlowMods, failedIds);
// FIXME do cleanup here (moved by BOC)
cleanUp();
return result;
}
throw new TimeoutException();
......@@ -522,8 +535,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
sws.remove(dpid);
//FIXME don't do cleanup here
cleanUp();
//FIXME don't do cleanup here (moved by BOC)
//cleanUp();
}
}
......