Brian O'Connor

started work to parrallize compliation and "installation"

Change-Id: I2e7f03b9f8074ef6f9e1c186009ed3cad6980b49
......@@ -82,6 +82,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
*/
@Deprecated
List<Operation> batchWrite(BatchWrite batch);
default void write(IntentData newData) {}
default void batchWrite(Iterable<IntentData> updates) {}
......
......@@ -22,6 +22,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
......@@ -66,6 +67,7 @@ import com.google.common.collect.Lists;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.intent.IntentState.FAILED;
import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
......@@ -115,7 +117,8 @@ public class IntentManager
protected FlowRuleService flowRuleService;
private ExecutorService executor;
private ExecutorService batchExecutor;
private ExecutorService workerExecutor;
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
......@@ -129,7 +132,8 @@ public class IntentManager
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-%d"));
batchExecutor = newSingleThreadExecutor(namedThreads("onos-intent-batch"));
workerExecutor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-worker-%d"));
idGenerator = coreService.getIdGenerator("intent-ids");
Intent.bindIdGenerator(idGenerator);
log.info("Started");
......@@ -140,7 +144,7 @@ public class IntentManager
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
eventDispatcher.removeSink(IntentEvent.class);
executor.shutdown();
batchExecutor.shutdown();
Intent.unbindIdGenerator(idGenerator);
log.info("Stopped");
}
......@@ -472,9 +476,15 @@ public class IntentManager
@Override
public void run() {
try {
List<IntentUpdate> updates = createIntentUpdates();
// 1. wrap each intentdata in a runnable and submit
List<Future<IntentUpdate>> updates = createIntentUpdates();
// TODO
// 2. wait for completion of all the work
// 3. accumulate results and submit batch write of IntentData to store
// (we can also try to update these individually)
new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
//new IntentBatchApplyFirst(ops, processIntentUpdates(updates), endTime, 0, null).run();
} catch (Exception e) {
log.error("Error submitting batches:", e);
// FIXME incomplete Intents should be cleaned up
......@@ -488,33 +498,40 @@ public class IntentManager
}
}
private List<IntentUpdate> createIntentUpdates() {
private List<Future<IntentUpdate>> createIntentUpdates() {
return ops.stream()
.map(IntentManager.this::createIntentUpdate)
.collect(Collectors.toList());
}
private List<CompletedIntentUpdate> processIntentUpdates(List<IntentUpdate> updates) {
// start processing each Intents
List<CompletedIntentUpdate> completed = new ArrayList<>();
for (IntentUpdate update : updates) {
Optional<IntentUpdate> phase = Optional.of(update);
IntentUpdate previous = update;
while (true) {
if (!phase.isPresent()) {
// FIXME: not type safe cast
completed.add((CompletedIntentUpdate) previous);
break;
}
previous = phase.get();
phase = previous.execute();
}
}
.map(IntentManager.this::submitIntentData)
.collect(Collectors.toList());
}
}
private Future<IntentUpdate> submitIntentData(IntentData data) {
return workerExecutor.submit(new IntentWorker(data));
}
return completed;
private class IntentWorker implements Callable<IntentUpdate> {
private final IntentData data;
private IntentWorker(IntentData data) {
this.data = data;
}
@Override
public IntentUpdate call() throws Exception {
IntentUpdate update = createIntentUpdate(data);
Optional<IntentUpdate> currentPhase = Optional.of(update);
IntentUpdate previousPhase = update;
while (currentPhase.isPresent()) {
previousPhase = currentPhase.get();
currentPhase = previousPhase.execute();
}
return previousPhase;
}
}
// TODO: better naming
private class IntentBatchApplyFirst extends IntentBatchPreprocess {
......@@ -596,7 +613,7 @@ public class IntentManager
retry();
} else {
// we are not done yet, yield the thread by resubmitting ourselves
executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
batchExecutor.submit(new IntentBatchProcessFutures(ops, intentUpdates, endTime, installAttempt, future));
}
} catch (Exception e) {
log.error("Error submitting batches:", e);
......@@ -665,7 +682,7 @@ public class IntentManager
return;
}
Future<CompletedBatchOperation> future = applyNextBatch(intentUpdates);
executor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
batchExecutor.submit(new IntentBatchProcessFutures(ops, intentUpdates, timeLimit, attempts, future));
} else {
log.error("Cancelling FlowRuleBatch failed.");
abandonShip();
......@@ -678,7 +695,8 @@ public class IntentManager
public void execute(Collection<IntentData> operations) {
log.info("Execute {} operation(s).", operations.size());
log.debug("Execute operations: {}", operations);
executor.execute(new IntentBatchPreprocess(operations));
batchExecutor.execute(new IntentBatchPreprocess(operations));
// TODO ensure that only one batch is in flight at a time
}
}
}
......
......@@ -38,4 +38,5 @@ public interface Accumulator<T> {
*/
void processEvents(List<T> events);
//TODO consider a blocking version that required consumer participation
}
......