Sho SHIMIZU
Committed by Ray Milkey

Refactor IntentManager: apply state pattern for intent state transition

Resolve ONOS-471.
- Define IntentUpdate sub-classes for intent state transition
- Define CompletedIntentUpdate and its sub-classes for parking intent state
- IntentUpdate.execute() handles one state transition and generates next state
- IntentInstall monitor is splitted into IntentBatchPreprocess and its sub-classes

Change-Id: Ie2d3a0b2ce9af7b98fd19a3a8cc00ab152ab6eaa
......@@ -52,12 +52,13 @@ import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
......@@ -65,9 +66,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onosproject.net.intent.IntentState.*;
import static org.onlab.util.Tools.namedThreads;
......@@ -155,14 +157,14 @@ public class IntentManager
public void submit(Intent intent) {
checkNotNull(intent, INTENT_NULL);
execute(IntentOperations.builder(intent.appId())
.addSubmitOperation(intent).build());
.addSubmitOperation(intent).build());
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
execute(IntentOperations.builder(intent.appId())
.addWithdrawOperation(intent.id()).build());
.addWithdrawOperation(intent.id()).build());
}
@Override
......@@ -283,100 +285,26 @@ public class IntentManager
}
/**
* Compiles the specified intent.
*
* @param update intent update
*/
private void executeCompilingPhase(IntentUpdate update) {
Intent intent = update.newIntent();
// Indicate that the intent is entering the compiling phase.
update.setInflightState(intent, COMPILING);
try {
// Compile the intent into installable derivatives.
List<Intent> installables = compileIntent(intent, update);
// If all went well, associate the resulting list of installable
// intents with the top-level intent and proceed to install.
update.setInstallables(installables);
} catch (PathNotFoundException e) {
log.debug("Path not found for intent {}", intent);
update.setInflightState(intent, FAILED);
} catch (IntentException e) {
log.warn("Unable to compile intent {} due to:", intent.id(), e);
// If compilation failed, mark the intent as failed.
update.setInflightState(intent, FAILED);
}
}
/**
* Compiles an intent recursively.
*
* @param intent intent
* @return result of compilation
*/
private List<Intent> compileIntent(Intent intent, IntentUpdate update) {
private List<Intent> compileIntent(Intent intent, List<Intent> previousInstallables) {
if (intent.isInstallable()) {
return ImmutableList.of(intent);
}
registerSubclassCompilerIfNeeded(intent);
List<Intent> previous = update.oldInstallables();
// FIXME: get previous resources
List<Intent> installable = new ArrayList<>();
for (Intent compiled : getCompiler(intent).compile(intent, previous, null)) {
installable.addAll(compileIntent(compiled, update));
for (Intent compiled : getCompiler(intent).compile(intent, previousInstallables, null)) {
installable.addAll(compileIntent(compiled, previousInstallables));
}
return installable;
}
/**
* Installs all installable intents associated with the specified top-level
* intent.
*
* @param update intent update
*/
private void executeInstallingPhase(IntentUpdate update) {
if (update.newInstallables() == null) {
//no failed intents allowed past this point...
return;
}
// Indicate that the intent is entering the installing phase.
update.setInflightState(update.newIntent(), INSTALLING);
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (Intent installable : update.newInstallables()) {
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(update.newIntent().id(),
installable.resources());
try {
batches.addAll(getInstaller(installable).install(installable));
} catch (Exception e) { // TODO this should be IntentException
log.warn("Unable to install intent {} due to:", update.newIntent().id(), e);
trackerService.removeTrackedResources(update.newIntent().id(),
installable.resources());
//TODO we failed; intent should be recompiled
update.setInflightState(update.newIntent(), FAILED);
}
}
update.addBatches(batches);
}
/**
* Uninstalls the specified intent by uninstalling all of its associated
* installable derivatives.
*
* @param update intent update
*/
private void executeWithdrawingPhase(IntentUpdate update) {
if (!update.oldIntent().equals(update.newIntent())) {
update.setInflightState(update.oldIntent(), WITHDRAWING);
} // else newIntent is FAILED
update.addBatches(uninstallIntent(update.oldIntent(), update.oldInstallables()));
}
/**
* Uninstalls all installable intents associated with the given intent.
*
* @param intent intent
......@@ -384,13 +312,10 @@ public class IntentManager
* @return list of batches to uninstall intent
*/
private List<FlowRuleBatchOperation> uninstallIntent(Intent intent, List<Intent> installables) {
if (installables == null) {
return Collections.emptyList();
}
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (Intent installable : installables) {
trackerService.removeTrackedResources(intent.id(),
installable.resources());
installable.resources());
try {
batches.addAll(getInstaller(installable).uninstall(installable));
} catch (IntentException e) {
......@@ -402,45 +327,6 @@ public class IntentManager
}
/**
* Withdraws the old intent and installs the new intent as one operation.
*
* @param update intent update
*/
private void executeReplacementPhase(IntentUpdate update) {
checkArgument(update.oldInstallables().size() == update.newInstallables().size(),
"Old and New Intent must have equivalent installable intents.");
if (!update.oldIntent().equals(update.newIntent())) {
// only set the old intent's state if it is different
update.setInflightState(update.oldIntent(), WITHDRAWING);
}
update.setInflightState(update.newIntent(), INSTALLING);
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (int i = 0; i < update.oldInstallables().size(); i++) {
Intent oldInstallable = update.oldInstallables().get(i);
Intent newInstallable = update.newInstallables().get(i);
//FIXME revisit this
// if (oldInstallable.equals(newInstallable)) {
// continue;
// }
checkArgument(oldInstallable.getClass().equals(newInstallable.getClass()),
"Installable Intent type mismatch.");
trackerService.removeTrackedResources(update.oldIntent().id(), oldInstallable.resources());
trackerService.addTrackedResources(update.newIntent().id(), newInstallable.resources());
try {
batches.addAll(getInstaller(newInstallable).replace(oldInstallable, newInstallable));
} catch (IntentException e) {
log.warn("Unable to update intent {} due to:", update.oldIntent().id(), e);
//FIXME... we failed. need to uninstall (if same) or revert (if different)
trackerService.removeTrackedResources(update.newIntent().id(), newInstallable.resources());
update.setInflightState(update.newIntent(), FAILED);
batches = uninstallIntent(update.oldIntent(), update.oldInstallables());
}
}
update.addBatches(batches);
}
/**
* Registers an intent compiler of the specified intent if an intent compiler
* for the intent is not registered. This method traverses the class hierarchy of
* the intent. Once an intent compiler for a parent type is found, this method
......@@ -550,255 +436,797 @@ public class IntentManager
}
}
// TODO move this inside IntentUpdate?
/**
* TODO. rename this...
* @param update intent update
*/
private void processIntentUpdate(IntentUpdate update) {
// check to see if the intent needs to be compiled or recompiled
if (update.newIntent() != null) {
executeCompilingPhase(update);
}
if (update.oldInstallables() != null && update.newInstallables() != null) {
executeReplacementPhase(update);
} else if (update.newInstallables() != null) {
executeInstallingPhase(update);
} else if (update.oldInstallables() != null) {
executeWithdrawingPhase(update);
} else {
if (update.oldIntent() != null &&
!update.oldIntent().equals(update.newIntent())) {
// removing failed intent
update.setInflightState(update.oldIntent(), WITHDRAWING);
}
// if (update.newIntent() != null) {
// // TODO assert that next state is failed
// }
// TODO: simplify the branching statements
private IntentUpdate createIntentUpdate(IntentOperation operation) {
switch (operation.type()) {
case SUBMIT:
return new InstallRequest(operation.intent());
case WITHDRAW: {
Intent oldIntent = store.getIntent(operation.intentId());
if (oldIntent == null) {
return new DoNothing();
}
List<Intent> installables = store.getInstallableIntents(oldIntent.id());
if (installables == null) {
return new WithdrawStateChange1(oldIntent);
}
return new WithdrawRequest(oldIntent, installables);
}
case REPLACE: {
Intent newIntent = operation.intent();
Intent oldIntent = store.getIntent(operation.intentId());
if (oldIntent == null) {
return new InstallRequest(newIntent);
}
List<Intent> installables = store.getInstallableIntents(oldIntent.id());
if (installables == null) {
if (newIntent.equals(oldIntent)) {
return new InstallRequest(newIntent);
} else {
return new WithdrawStateChange2(oldIntent);
}
}
return new ReplaceRequest(newIntent, oldIntent, installables);
}
case UPDATE: {
Intent oldIntent = store.getIntent(operation.intentId());
if (oldIntent == null) {
return new DoNothing();
}
List<Intent> installables = getInstallableIntents(oldIntent.id());
if (installables == null) {
return new InstallRequest(oldIntent);
}
return new ReplaceRequest(oldIntent, oldIntent, installables);
}
default:
// illegal state
return new DoNothing();
}
}
// TODO comments...
private class IntentUpdate {
private final Intent oldIntent;
private abstract class IntentUpdate {
/**
* Execute the procedure represented by the instance
* and generates the next update instance.
*
* @return next update
*/
public Optional<IntentUpdate> execute() {
return Optional.empty();
}
/**
* Write data to the specified BatchWrite before execution() is called.
*
* @param batchWrite batchWrite
*/
public void writeBeforeExecution(BatchWrite batchWrite) {}
}
private abstract class CompletedIntentUpdate extends IntentUpdate {
/**
* Write data to the specified BatchWrite after execution() is called.
*
* @param batchWrite batchWrite
*/
public void writeAfterExecution(BatchWrite batchWrite) {}
public void batchSuccess() {}
public void batchFailed() {}
/**
* Returns the current FlowRuleBatchOperation.
*
* @return current FlowRuleBatchOperation
*/
public FlowRuleBatchOperation currentBatch() {
return null;
}
/**
* Returns all of installable intents this instance holds.
*
* @return all of installable intents
*/
public List<Intent> allInstallables() {
return Collections.emptyList();
}
}
private class InstallRequest extends IntentUpdate {
private final Intent intent;
InstallRequest(Intent intent) {
this.intent = checkNotNull(intent);
}
@Override
public void writeBeforeExecution(BatchWrite batchWrite) {
// TODO consider only "creating" intent if it does not exist
// Note: We need to set state to INSTALL_REQ regardless.
batchWrite.createIntent(intent);
}
@Override
public Optional<IntentUpdate> execute() {
return Optional.of(new Compiling(intent));
}
}
private class WithdrawRequest extends IntentUpdate {
private final Intent intent;
private final List<Intent> installables;
WithdrawRequest(Intent intent, List<Intent> installables) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(checkNotNull(installables));
}
@Override
public void writeBeforeExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, WITHDRAW_REQ);
}
@Override
public Optional<IntentUpdate> execute() {
return Optional.of(new Withdrawing(intent, installables));
}
}
private class ReplaceRequest extends IntentUpdate {
private final Intent newIntent;
private final Map<Intent, IntentState> stateMap = Maps.newHashMap();
private final Intent oldIntent;
private final List<Intent> oldInstallables;
ReplaceRequest(Intent newIntent, Intent oldIntent, List<Intent> oldInstallables) {
this.newIntent = checkNotNull(newIntent);
this.oldIntent = checkNotNull(oldIntent);
this.oldInstallables = ImmutableList.copyOf(oldInstallables);
}
@Override
public void writeBeforeExecution(BatchWrite batchWrite) {
// TODO consider only "creating" intent if it does not exist
// Note: We need to set state to INSTALL_REQ regardless.
batchWrite.createIntent(newIntent);
}
@Override
public Optional<IntentUpdate> execute() {
try {
List<Intent> installables = compileIntent(newIntent, oldInstallables);
return Optional.of(new Replacing(newIntent, oldIntent, installables, oldInstallables));
} catch (PathNotFoundException e) {
log.debug("Path not found for intent {}", newIntent);
return Optional.of(new Withdrawing(oldIntent, oldInstallables));
} catch (IntentException e) {
log.warn("Unable to compile intent {} due to:", newIntent.id(), e);
return Optional.of(new Withdrawing(oldIntent, oldInstallables));
}
}
}
private class DoNothing extends CompletedIntentUpdate {
}
// TODO: better naming
private class WithdrawStateChange1 extends CompletedIntentUpdate {
private final Intent intent;
WithdrawStateChange1(Intent intent) {
this.intent = checkNotNull(intent);
}
@Override
public void writeBeforeExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, WITHDRAW_REQ);
}
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, WITHDRAWN);
batchWrite.removeInstalledIntents(intent.id());
batchWrite.removeIntent(intent.id());
}
}
// TODO: better naming
private class WithdrawStateChange2 extends CompletedIntentUpdate {
private final Intent intent;
WithdrawStateChange2(Intent intent) {
this.intent = checkNotNull(intent);
}
@Override
public void writeBeforeExecution(BatchWrite batchWrite) {
// TODO consider only "creating" intent if it does not exist
// Note: We need to set state to INSTALL_REQ regardless.
batchWrite.createIntent(intent);
}
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, WITHDRAWN);
batchWrite.removeInstalledIntents(intent.id());
batchWrite.removeIntent(intent.id());
}
}
private class Compiling extends IntentUpdate {
private final Intent intent;
Compiling(Intent intent) {
this.intent = checkNotNull(intent);
}
@Override
public Optional<IntentUpdate> execute() {
try {
// Compile the intent into installable derivatives.
// If all went well, associate the resulting list of installable
// intents with the top-level intent and proceed to install.
return Optional.of(new Installing(intent, compileIntent(intent, null)));
} catch (PathNotFoundException e) {
return Optional.of(new CompilingFailed(intent, e));
} catch (IntentException e) {
return Optional.of(new CompilingFailed(intent, e));
}
}
}
// TODO: better naming because install() method actually generate FlowRuleBatchOperations
private class Installing extends IntentUpdate {
private final Intent intent;
private final List<Intent> installables;
Installing(Intent intent, List<Intent> installables) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(checkNotNull(installables));
}
@Override
public Optional<IntentUpdate> execute() {
Exception exception = null;
// Indicate that the intent is entering the installing phase.
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (Intent installable : installables) {
registerSubclassInstallerIfNeeded(installable);
trackerService.addTrackedResources(intent.id(), installable.resources());
try {
batches.addAll(getInstaller(installable).install(installable));
} catch (Exception e) { // TODO this should be IntentException
log.warn("Unable to install intent {} due to:", intent.id(), e);
trackerService.removeTrackedResources(intent.id(), installable.resources());
//TODO we failed; intent should be recompiled
exception = e;
}
}
if (exception != null) {
return Optional.of(new InstallingFailed(intent, installables, batches));
}
return Optional.of(new Installed(intent, installables, batches));
}
}
private class Withdrawing extends IntentUpdate {
private final Intent intent;
private final List<Intent> installables;
Withdrawing(Intent intent, List<Intent> installables) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(installables);
}
@Override
public Optional<IntentUpdate> execute() {
List<FlowRuleBatchOperation> batches = uninstallIntent(intent, installables);
return Optional.of(new Withdrawn(intent, installables, batches));
}
}
private class Replacing extends IntentUpdate {
private final Intent newIntent;
private final Intent oldIntent;
private final List<Intent> newInstallables;
private final List<Intent> oldInstallables;
private List<Intent> newInstallables;
private final List<FlowRuleBatchOperation> batches = Lists.newLinkedList();
private int currentBatch = 0; // TODO: maybe replace with an iterator
IntentUpdate(IntentOperation op) {
switch (op.type()) {
case SUBMIT:
newIntent = op.intent();
oldIntent = null;
break;
case WITHDRAW:
newIntent = null;
oldIntent = store.getIntent(op.intentId());
break;
case REPLACE:
newIntent = op.intent();
oldIntent = store.getIntent(op.intentId());
private Exception exception;
Replacing(Intent newIntent, Intent oldIntent,
List<Intent> newInstallables, List<Intent> oldInstallables) {
this.newIntent = checkNotNull(newIntent);
this.oldIntent = checkNotNull(oldIntent);
this.newInstallables = ImmutableList.copyOf(checkNotNull(newInstallables));
this.oldInstallables = ImmutableList.copyOf(checkNotNull(oldInstallables));
}
@Override
public Optional<IntentUpdate> execute() {
List<FlowRuleBatchOperation> batches = replace();
if (exception == null) {
return Optional.of(
new Replaced(newIntent, oldIntent, newInstallables, oldInstallables, batches));
}
return Optional.of(
new ReplacingFailed(newIntent, oldIntent, newInstallables, oldInstallables, batches));
}
protected List<FlowRuleBatchOperation> replace() {
checkState(oldInstallables.size() == newInstallables.size(),
"Old and New Intent must have equivalent installable intents.");
List<FlowRuleBatchOperation> batches = Lists.newArrayList();
for (int i = 0; i < oldInstallables.size(); i++) {
Intent oldInstallable = oldInstallables.get(i);
Intent newInstallable = newInstallables.get(i);
//FIXME revisit this
// if (oldInstallable.equals(newInstallable)) {
// continue;
// }
checkState(oldInstallable.getClass().equals(newInstallable.getClass()),
"Installable Intent type mismatch.");
trackerService.removeTrackedResources(oldIntent.id(), oldInstallable.resources());
trackerService.addTrackedResources(newIntent.id(), newInstallable.resources());
try {
batches.addAll(getInstaller(newInstallable).replace(oldInstallable, newInstallable));
} catch (IntentException e) {
log.warn("Unable to update intent {} due to:", oldIntent.id(), e);
//FIXME... we failed. need to uninstall (if same) or revert (if different)
trackerService.removeTrackedResources(newIntent.id(), newInstallable.resources());
exception = e;
batches = uninstallIntent(oldIntent, oldInstallables);
}
}
return batches;
}
}
private class Installed extends CompletedIntentUpdate {
private final Intent intent;
private final List<Intent> installables;
private IntentState intentState;
private final List<FlowRuleBatchOperation> batches;
private int currentBatch = 0;
Installed(Intent intent, List<Intent> installables, List<FlowRuleBatchOperation> batches) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(checkNotNull(installables));
this.batches = new LinkedList<>(checkNotNull(batches));
this.intentState = INSTALLING;
}
@Override
public void batchSuccess() {
currentBatch++;
}
@Override
public List<Intent> allInstallables() {
return installables;
}
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
switch (intentState) {
case INSTALLING:
batchWrite.setState(intent, INSTALLED);
batchWrite.setInstallableIntents(intent.id(), this.installables);
break;
case UPDATE:
oldIntent = store.getIntent(op.intentId());
newIntent = oldIntent;
case FAILED:
batchWrite.setState(intent, FAILED);
batchWrite.removeInstalledIntents(intent.id());
break;
default:
oldIntent = null;
newIntent = null;
break;
}
// fetch the old intent's installables from the store
if (oldIntent != null) {
oldInstallables = store.getInstallableIntents(oldIntent.id());
} else {
oldInstallables = null;
if (newIntent == null) {
log.info("Ignoring {} for missing Intent {}", op.type(), op.intentId());
}
}
}
void init(BatchWrite batchWrite) {
if (newIntent != null) {
// TODO consider only "creating" intent if it does not exist
// Note: We need to set state to INSTALL_REQ regardless.
batchWrite.createIntent(newIntent);
} else if (oldIntent != null && !oldIntent.equals(newIntent)) {
batchWrite.setState(oldIntent, WITHDRAW_REQ);
@Override
public FlowRuleBatchOperation currentBatch() {
return currentBatch < batches.size() ? batches.get(currentBatch) : null;
}
@Override
public void batchFailed() {
// the current batch has failed, so recompile
// remove the current batch and all remaining
for (int i = batches.size() - 1; i >= currentBatch; i--) {
batches.remove(i);
}
intentState = FAILED;
batches.addAll(uninstallIntent(intent, installables));
// TODO we might want to try to recompile the new intent
}
}
Intent oldIntent() {
return oldIntent;
private class Withdrawn extends CompletedIntentUpdate {
private final Intent intent;
private final List<Intent> installables;
private final List<FlowRuleBatchOperation> batches;
private int currentBatch;
Withdrawn(Intent intent, List<Intent> installables, List<FlowRuleBatchOperation> batches) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(installables);
this.batches = new LinkedList<>(batches);
this.currentBatch = 0;
}
Intent newIntent() {
return newIntent;
@Override
public List<Intent> allInstallables() {
return installables;
}
List<Intent> oldInstallables() {
return oldInstallables;
@Override
public void batchSuccess() {
currentBatch++;
}
List<Intent> newInstallables() {
return newInstallables;
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, WITHDRAWN);
batchWrite.removeInstalledIntents(intent.id());
batchWrite.removeIntent(intent.id());
}
void setInstallables(List<Intent> installables) {
newInstallables = installables;
@Override
public FlowRuleBatchOperation currentBatch() {
return currentBatch < batches.size() ? batches.get(currentBatch) : null;
}
boolean isComplete() {
return currentBatch >= batches.size();
@Override
public void batchFailed() {
// the current batch has failed, so recompile
// remove the current batch and all remaining
for (int i = batches.size() - 1; i >= currentBatch; i--) {
batches.remove(i);
}
batches.addAll(uninstallIntent(intent, installables));
}
}
private class Replaced extends CompletedIntentUpdate {
private final Intent newIntent;
private final Intent oldIntent;
private final List<Intent> newInstallables;
private final List<Intent> oldInstallables;
private final List<FlowRuleBatchOperation> batches;
private int currentBatch;
Replaced(Intent newIntent, Intent oldIntent,
List<Intent> newInstallables, List<Intent> oldInstallables,
List<FlowRuleBatchOperation> batches) {
this.newIntent = checkNotNull(newIntent);
this.oldIntent = checkNotNull(oldIntent);
this.newInstallables = ImmutableList.copyOf(checkNotNull(newInstallables));
this.oldInstallables = ImmutableList.copyOf(checkNotNull(oldInstallables));
this.batches = new LinkedList<>(batches);
this.currentBatch = 0;
}
@Override
public List<Intent> allInstallables() {
LinkedList<Intent> allInstallables = new LinkedList<>();
allInstallables.addAll(newInstallables);
allInstallables.addAll(oldInstallables);
FlowRuleBatchOperation currentBatch() {
return !isComplete() ? batches.get(currentBatch) : null;
return allInstallables;
}
void batchSuccess() {
// move on to next Batch
@Override
public void batchSuccess() {
currentBatch++;
}
void batchFailed() {
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(newIntent, INSTALLED);
batchWrite.setInstallableIntents(newIntent.id(), newInstallables);
batchWrite.setState(oldIntent, WITHDRAWN);
batchWrite.removeInstalledIntents(oldIntent.id());
batchWrite.removeIntent(oldIntent.id());
}
@Override
public FlowRuleBatchOperation currentBatch() {
return currentBatch < batches.size() ? batches.get(currentBatch) : null;
}
@Override
public void batchFailed() {
// the current batch has failed, so recompile
// remove the current batch and all remaining
for (int i = currentBatch; i < batches.size(); i++) {
for (int i = batches.size() - 1; i >= currentBatch; i--) {
batches.remove(i);
}
if (oldIntent != null) {
executeWithdrawingPhase(this); // remove the old intent
batches.addAll(uninstallIntent(oldIntent, oldInstallables));
batches.addAll(uninstallIntent(newIntent, newInstallables));
// TODO we might want to try to recompile the new intent
}
}
private class CompilingFailed extends CompletedIntentUpdate {
private final Intent intent;
private final IntentException exception;
CompilingFailed(Intent intent, IntentException exception) {
this.intent = checkNotNull(intent);
this.exception = checkNotNull(exception);
}
@Override
public Optional<IntentUpdate> execute() {
if (exception instanceof PathNotFoundException) {
log.debug("Path not found for intent {}", intent);
} else {
log.warn("Unable to compile intent {} due to:", intent.id(), exception);
}
if (newIntent != null) {
setInflightState(newIntent, FAILED);
batches.addAll(uninstallIntent(newIntent, newInstallables()));
return Optional.empty();
}
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, FAILED);
batchWrite.removeInstalledIntents(intent.id());
}
}
private class InstallingFailed extends CompletedIntentUpdate {
private final Intent intent;
private final List<Intent> installables;
private final List<FlowRuleBatchOperation> batches;
private int currentBatch = 0;
InstallingFailed(Intent intent, List<Intent> installables, List<FlowRuleBatchOperation> batches) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(checkNotNull(installables));
this.batches = new LinkedList<>(checkNotNull(batches));
}
@Override
public List<Intent> allInstallables() {
return installables;
}
@Override
public void batchSuccess() {
currentBatch++;
}
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(intent, FAILED);
batchWrite.removeInstalledIntents(intent.id());
}
@Override
public FlowRuleBatchOperation currentBatch() {
return currentBatch < batches.size() ? batches.get(currentBatch) : null;
}
@Override
public void batchFailed() {
// the current batch has failed, so recompile
// remove the current batch and all remaining
for (int i = batches.size() - 1; i >= currentBatch; i--) {
batches.remove(i);
}
batches.addAll(uninstallIntent(intent, installables));
// TODO we might want to try to recompile the new intent
}
}
private void finalizeStates(BatchWrite batchWrite) {
// events to be triggered on successful write
for (Intent intent : stateMap.keySet()) {
switch (getInflightState(intent)) {
case INSTALLING:
batchWrite.setState(intent, INSTALLED);
batchWrite.setInstallableIntents(newIntent.id(), newInstallables);
break;
case WITHDRAWING:
batchWrite.setState(intent, WITHDRAWN);
batchWrite.removeInstalledIntents(intent.id());
batchWrite.removeIntent(intent.id());
break;
case FAILED:
batchWrite.setState(intent, FAILED);
batchWrite.removeInstalledIntents(intent.id());
break;
private class ReplacingFailed extends CompletedIntentUpdate {
// FALLTHROUGH to default from here
case INSTALL_REQ:
case COMPILING:
case RECOMPILING:
case WITHDRAW_REQ:
case WITHDRAWN:
case INSTALLED:
default:
//FIXME clean this up (we shouldn't ever get here)
log.warn("Bad state: {} for {}", getInflightState(intent), intent);
break;
}
}
private final Intent newIntent;
private final Intent oldIntent;
private final List<Intent> newInstallables;
private final List<Intent> oldInstallables;
private final List<FlowRuleBatchOperation> batches;
private int currentBatch;
ReplacingFailed(Intent newIntent, Intent oldIntent,
List<Intent> newInstallables, List<Intent> oldInstallables,
List<FlowRuleBatchOperation> batches) {
this.newIntent = checkNotNull(newIntent);
this.oldIntent = checkNotNull(oldIntent);
this.newInstallables = ImmutableList.copyOf(checkNotNull(newInstallables));
this.oldInstallables = ImmutableList.copyOf(checkNotNull(oldInstallables));
this.batches = new LinkedList<>(batches);
this.currentBatch = 0;
}
@Override
public List<Intent> allInstallables() {
LinkedList<Intent> allInstallables = new LinkedList<>();
allInstallables.addAll(newInstallables);
allInstallables.addAll(oldInstallables);
return allInstallables;
}
void addBatches(List<FlowRuleBatchOperation> batches) {
this.batches.addAll(batches);
@Override
public void batchSuccess() {
currentBatch++;
}
IntentState getInflightState(Intent intent) {
return stateMap.get(intent);
@Override
public void writeAfterExecution(BatchWrite batchWrite) {
batchWrite.setState(newIntent, FAILED);
batchWrite.removeInstalledIntents(newIntent.id());
batchWrite.setState(oldIntent, WITHDRAWN);
batchWrite.removeInstalledIntents(oldIntent.id());
batchWrite.removeIntent(oldIntent.id());
}
// set transient state during intent update process
void setInflightState(Intent intent, IntentState newState) {
// This method should be called for
// transition to non-parking or Failed only
if (!NON_PARKED_OR_FAILED.contains(newState)) {
log.error("Unexpected transition to {}", newState);
@Override
public FlowRuleBatchOperation currentBatch() {
return currentBatch < batches.size() ? batches.get(currentBatch) : null;
}
@Override
public void batchFailed() {
// the current batch has failed, so recompile
// remove the current batch and all remaining
for (int i = batches.size() - 1; i >= currentBatch; i--) {
batches.remove(i);
}
batches.addAll(uninstallIntent(oldIntent, oldInstallables));
IntentState oldState = stateMap.get(intent);
log.debug("intent id: {}, old state: {}, new state: {}",
intent.id(), oldState, newState);
batches.addAll(uninstallIntent(newIntent, newInstallables));
stateMap.put(intent, newState);
// TODO we might want to try to recompile the new intent
}
}
private class IntentInstallMonitor implements Runnable {
private class IntentBatchPreprocess implements Runnable {
// TODO make this configurable through a configuration file using @Property mechanism
// These fields needs to be moved to the enclosing class and configurable through a configuration file
// TODO make this configurable
private static final int TIMEOUT_PER_OP = 500; // ms
private static final int MAX_ATTEMPTS = 3;
protected static final int MAX_ATTEMPTS = 3;
private final IntentOperations ops;
private final List<IntentUpdate> intentUpdates = Lists.newArrayList();
private final Duration timeoutPerOperation;
private final int maxAttempts;
protected final IntentOperations ops;
// future holding current FlowRuleBatch installation result
private Future<CompletedBatchOperation> future;
private long startTime = System.currentTimeMillis();
private long endTime;
private int installAttempt;
protected final long startTime = System.currentTimeMillis();
protected final long endTime;
public IntentInstallMonitor(IntentOperations ops) {
this(ops, Duration.ofMillis(TIMEOUT_PER_OP), MAX_ATTEMPTS);
private IntentBatchPreprocess(IntentOperations ops, long endTime) {
this.ops = checkNotNull(ops);
this.endTime = endTime;
}
public IntentInstallMonitor(IntentOperations ops, Duration timeoutPerOperation, int maxAttempts) {
this.ops = checkNotNull(ops);
this.timeoutPerOperation = checkNotNull(timeoutPerOperation);
checkArgument(maxAttempts > 0, "maxAttempts must be larger than 0, but %s", maxAttempts);
this.maxAttempts = maxAttempts;
public IntentBatchPreprocess(IntentOperations ops) {
this(ops, System.currentTimeMillis() + ops.operations().size() * TIMEOUT_PER_OP);
}
resetTimeoutLimit();
// FIXME compute reasonable timeouts
protected long calculateTimeoutLimit() {
return System.currentTimeMillis() + ops.operations().size() * TIMEOUT_PER_OP;
}
private void resetTimeoutLimit() {
// FIXME compute reasonable timeouts
this.endTime = System.currentTimeMillis()
+ ops.operations().size() * timeoutPerOperation.toMillis();
@Override
public void run() {
try {
// this should only be called on the first iteration
// note: this a "expensive", so it is not done in the constructor
// - creates per Intent installation context (IntentUpdate)
// - write Intents to store
// - process (compile, install, etc.) each Intents
// - generate FlowRuleBatch for this phase
// build IntentUpdates
List<IntentUpdate> updates = createIntentUpdates();
// Write batch information
BatchWrite batchWrite = createBatchWrite(updates);
writeBatch(batchWrite);
new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
} catch (Exception e) {
log.error("Error submitting batches:", e);
// FIXME incomplete Intents should be cleaned up
// (transition to FAILED, etc.)
// TODO: remove duplicate due to inlining
// the batch has failed
// TODO: maybe we should do more?
log.error("Walk the plank, matey...");
batchService.removeIntentOperations(ops);
}
}
private List<IntentUpdate> createIntentUpdates() {
return ops.operations().stream()
.map(IntentManager.this::createIntentUpdate)
.collect(Collectors.toList());
}
private void buildIntentUpdates() {
private BatchWrite createBatchWrite(List<IntentUpdate> updates) {
BatchWrite batchWrite = BatchWrite.newInstance();
updates.forEach(update -> update.writeBeforeExecution(batchWrite));
return batchWrite;
}
// create context and record new request to store
for (IntentOperation op : ops.operations()) {
IntentUpdate update = new IntentUpdate(op);
update.init(batchWrite);
intentUpdates.add(update);
private List<CompletedIntentUpdate> processIntentUpdates(List<IntentUpdate> updates) {
// start processing each Intents
List<CompletedIntentUpdate> completed = new ArrayList<>();
for (IntentUpdate update : updates) {
Optional<IntentUpdate> phase = Optional.of(update);
IntentUpdate previous = update;
while (true) {
if (!phase.isPresent()) {
// FIXME: not type safe cast
completed.add((CompletedIntentUpdate) previous);
break;
}
previous = phase.get();
phase = previous.execute();
}
}
return completed;
}
protected void writeBatch(BatchWrite batchWrite) {
if (!batchWrite.isEmpty()) {
store.batchWrite(batchWrite);
}
}
}
// start processing each Intents
for (IntentUpdate update : intentUpdates) {
processIntentUpdate(update);
}
future = applyNextBatch();
// TODO: better naming
private class IntentBatchApplyFirst extends IntentBatchPreprocess {
protected final List<CompletedIntentUpdate> intentUpdates;
protected final int installAttempt;
protected Future<CompletedBatchOperation> future;
IntentBatchApplyFirst(IntentOperations operations, List<CompletedIntentUpdate> intentUpdates,
long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
super(operations, endTime);
this.intentUpdates = ImmutableList.copyOf(intentUpdates);
this.future = future;
this.installAttempt = installAttempt;
}
@Override
public void run() {
Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future).run();
}
/**
......@@ -806,84 +1234,127 @@ public class IntentManager
*
* @return Future for next batch
*/
private Future<CompletedBatchOperation> applyNextBatch() {
protected Future<CompletedBatchOperation> applyNextBatch(List<CompletedIntentUpdate> updates) {
//TODO test this. (also, maybe save this batch)
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
for (IntentUpdate update : intentUpdates) {
if (!update.isComplete()) {
batch.addAll(update.currentBatch());
}
}
FlowRuleBatchOperation batch = createFlowRuleBatchOperation(updates);
if (batch.size() > 0) {
//FIXME apply batch might throw an exception
return flowRuleService.applyBatch(batch);
} else {
// there are no flow rule batches; finalize the intent update
BatchWrite batchWrite = BatchWrite.newInstance();
for (IntentUpdate update : intentUpdates) {
update.finalizeStates(batchWrite);
}
if (!batchWrite.isEmpty()) {
store.batchWrite(batchWrite);
}
BatchWrite batchWrite = createFinalizedBatchWrite(updates);
writeBatch(batchWrite);
return null;
}
}
private void updateBatches(CompletedBatchOperation completed) {
if (completed.isSuccess()) {
for (IntentUpdate update : intentUpdates) {
update.batchSuccess();
private FlowRuleBatchOperation createFlowRuleBatchOperation(List<CompletedIntentUpdate> intentUpdates) {
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(Collections.emptyList());
for (CompletedIntentUpdate update : intentUpdates) {
FlowRuleBatchOperation currentBatch = update.currentBatch();
if (currentBatch != null) {
batch.addAll(currentBatch);
}
} else {
// entire batch has been reverted...
log.debug("Failed items: {}", completed.failedItems());
log.debug("Failed ids: {}", completed.failedIds());
}
return batch;
}
for (Long id : completed.failedIds()) {
IntentId targetId = IntentId.valueOf(id);
for (IntentUpdate update : intentUpdates) {
List<Intent> installables = Lists.newArrayList(update.newInstallables());
if (update.oldInstallables() != null) {
installables.addAll(update.oldInstallables());
}
for (Intent intent : installables) {
if (intent.id().equals(targetId)) {
update.batchFailed();
break;
}
}
}
// don't increment the non-failed items, as they have been reverted.
}
private BatchWrite createFinalizedBatchWrite(List<CompletedIntentUpdate> intentUpdates) {
BatchWrite batchWrite = BatchWrite.newInstance();
for (CompletedIntentUpdate update : intentUpdates) {
update.writeAfterExecution(batchWrite);
}
return batchWrite;
}
private void abandonShip() {
protected void abandonShip() {
// the batch has failed
// TODO: maybe we should do more?
log.error("Walk the plank, matey...");
future = null;
batchService.removeIntentOperations(ops);
}
}
// TODO: better naming
private class IntentBatchProcessFutures extends IntentBatchApplyFirst {
IntentBatchProcessFutures(IntentOperations operations, List<CompletedIntentUpdate> intentUpdates,
long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
super(operations, intentUpdates, endTime, installAttempt, future);
}
@Override
public void run() {
try {
// - peek if current FlowRuleBatch is complete
// -- If complete OK:
// step each IntentUpdate forward
// If phase left: generate next FlowRuleBatch
// If no more phase: write parking states
// -- If complete FAIL:
// Intent which failed: transition Intent to FAILED
// Other Intents: resubmit same FlowRuleBatch for this phase
Future<CompletedBatchOperation> future = processFutures();
if (future == null) {
// there are no outstanding batches; we are done
batchService.removeIntentOperations(ops);
} else if (System.currentTimeMillis() > endTime) {
// - cancel current FlowRuleBatch and resubmit again
retry();
} else {
// we are not done yet, yield the thread by resubmitting ourselves
executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
}
} catch (Exception e) {
log.error("Error submitting batches:", e);
// FIXME incomplete Intents should be cleaned up
// (transition to FAILED, etc.)
abandonShip();
}
}
/**
* Iterate through the pending futures, and remove them when they have completed.
*/
private void processFutures() {
if (future == null) {
// we are done if the future is null
return;
}
private Future<CompletedBatchOperation> processFutures() {
try {
CompletedBatchOperation completed = future.get(100, TimeUnit.NANOSECONDS);
updateBatches(completed);
future = applyNextBatch();
return applyNextBatch(intentUpdates);
} catch (TimeoutException | InterruptedException te) {
log.trace("Installation of intents are still pending: {}", ops);
return future;
} catch (ExecutionException e) {
log.warn("Execution of batch failed: {}", ops, e);
abandonShip();
return future;
}
}
private void updateBatches(CompletedBatchOperation completed) {
if (completed.isSuccess()) {
for (CompletedIntentUpdate update : intentUpdates) {
update.batchSuccess();
}
} else {
// entire batch has been reverted...
log.debug("Failed items: {}", completed.failedItems());
log.debug("Failed ids: {}", completed.failedIds());
for (Long id : completed.failedIds()) {
IntentId targetId = IntentId.valueOf(id);
for (CompletedIntentUpdate update : intentUpdates) {
for (Intent intent : update.allInstallables()) {
if (intent.id().equals(targetId)) {
update.batchFailed();
break;
}
}
}
// don't increment the non-failed items, as they have been reverted.
}
}
}
......@@ -891,19 +1362,19 @@ public class IntentManager
log.debug("Execution timed out, retrying.");
if (future.cancel(true)) { // cancel success; batch is reverted
// reset the timer
resetTimeoutLimit();
installAttempt++;
if (installAttempt == maxAttempts) {
long timeLimit = calculateTimeoutLimit();
int attempts = installAttempt + 1;
if (attempts == MAX_ATTEMPTS) {
log.warn("Install request timed out: {}", ops);
for (IntentUpdate update : intentUpdates) {
for (CompletedIntentUpdate update : intentUpdates) {
update.batchFailed();
}
} else if (installAttempt > maxAttempts) {
} else if (attempts > MAX_ATTEMPTS) {
abandonShip();
return;
} // else just resubmit the work
future = applyNextBatch();
executor.submit(this);
Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
} else {
log.error("Cancelling FlowRuleBatch failed.");
// FIXME
......@@ -913,51 +1384,6 @@ public class IntentManager
abandonShip();
}
}
boolean isComplete() {
return future == null;
}
@Override
public void run() {
try {
if (intentUpdates.isEmpty()) {
// this should only be called on the first iteration
// note: this a "expensive", so it is not done in the constructor
// - creates per Intent installation context (IntentUpdate)
// - write Intents to store
// - process (compile, install, etc.) each Intents
// - generate FlowRuleBatch for this phase
buildIntentUpdates();
}
// - peek if current FlowRuleBatch is complete
// -- If complete OK:
// step each IntentUpdate forward
// If phase left: generate next FlowRuleBatch
// If no more phase: write parking states
// -- If complete FAIL:
// Intent which failed: transition Intent to FAILED
// Other Intents: resubmit same FlowRuleBatch for this phase
processFutures();
if (isComplete()) {
// there are no outstanding batches; we are done
batchService.removeIntentOperations(ops);
} else if (endTime < System.currentTimeMillis()) {
// - cancel current FlowRuleBatch and resubmit again
retry();
} else {
// we are not done yet, yield the thread by resubmitting ourselves
executor.submit(this);
}
} catch (Exception e) {
log.error("Error submitting batches:", e);
// FIXME incomplete Intents should be cleaned up
// (transition to FAILED, etc.)
abandonShip();
}
}
}
private class InternalBatchDelegate implements IntentBatchDelegate {
......@@ -966,7 +1392,7 @@ public class IntentManager
log.info("Execute {} operation(s).", operations.operations().size());
log.debug("Execute operations: {}", operations.operations());
//FIXME: perhaps we want to track this task so that we can cancel it.
executor.execute(new IntentInstallMonitor(operations));
executor.execute(new IntentBatchPreprocess(operations));
}
@Override
......@@ -975,5 +1401,4 @@ public class IntentManager
log.warn("NOT IMPLEMENTED -- Cancel operations: {}", operations);
}
}
}
......