Brian O'Connor

Updates to Intent Manager and store interface

Change-Id: Ida612bf5d0f4abe7e81d2f307a80f989603015e7
......@@ -15,10 +15,11 @@
*/
package org.onosproject.net.intent;
import java.util.Collection;
/**
* Facade for receiving notifications from the intent batch service.
*/
@Deprecated
public interface IntentBatchDelegate {
/**
......@@ -26,12 +27,6 @@ public interface IntentBatchDelegate {
*
* @param operations batch of operations
*/
void execute(IntentOperations operations);
void execute(Collection<IntentData> operations);
/**
* Cancesl the specified batch of intent operations.
*
* @param operations batch of operations to be cancelled
*/
void cancel(IntentOperations operations);
}
......
......@@ -45,6 +45,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @param intentId intent identification
* @return intent or null if not found
*/
@Deprecated
Intent getIntent(IntentId intentId);
/**
......@@ -53,6 +54,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @param intentId intent identification
* @return current intent state
*/
@Deprecated
IntentState getIntentState(IntentId intentId);
/**
......@@ -62,6 +64,7 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* @param intentId original intent identifier
* @return compiled installable intents
*/
@Deprecated
List<Intent> getInstallableIntents(IntentId intentId);
/**
......@@ -74,6 +77,26 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
List<Operation> batchWrite(BatchWrite batch);
/**
* Returns the intent with the specified identifier.
*
* @param key key
* @return intent or null if not found
*/
default Intent getIntent(String key) { //FIXME remove when impl.
return null;
}
/**
* Returns the intent data object associated with the specified key.
*
* @param key key to look up
* @return intent data object
*/
default IntentData getIntentData(String key) { //FIXME remove when impl.
return null;
}
/**
* Adds a new operation, which should be persisted and delegated.
*
* @param intent operation
......
......@@ -17,6 +17,7 @@ package org.onosproject.net.intent.impl;
import com.google.common.collect.Maps;
import org.onlab.util.AbstractAccumulator;
import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentData;
import java.util.List;
......@@ -37,16 +38,20 @@ public class IntentAccumulator extends AbstractAccumulator<IntentData> {
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer("intent-op-batching");
private final IntentBatchDelegate delegate;
/**
* Creates an intent operation accumulator.
*/
protected IntentAccumulator() {
protected IntentAccumulator(IntentBatchDelegate delegate) {
super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
this.delegate = delegate;
}
@Override
public void processEvents(List<IntentData> ops) {
Map<String, IntentData> opMap = reduce(ops);
delegate.execute(opMap.values());
// FIXME kick off the work
//for (IntentData data : opMap.values()) {}
}
......
......@@ -33,9 +33,9 @@ import org.onosproject.event.EventDeliveryService;
import org.onosproject.net.flow.CompletedBatchOperation;
import org.onosproject.net.flow.FlowRuleBatchOperation;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentBatchService;
import org.onosproject.net.intent.IntentCompiler;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
......@@ -49,11 +49,11 @@ import org.onosproject.net.intent.IntentOperations;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedList;
......@@ -72,8 +72,8 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onosproject.net.intent.IntentState.*;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.net.intent.IntentState.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -110,9 +110,6 @@ public class IntentManager
protected IntentStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentBatchService batchService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ObjectiveTrackerService trackerService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -129,13 +126,12 @@ public class IntentManager
private final IntentBatchDelegate batchDelegate = new InternalBatchDelegate();
private IdGenerator idGenerator;
private final IntentAccumulator accumulator = new IntentAccumulator();
private final IntentAccumulator accumulator = new IntentAccumulator(batchDelegate);
@Activate
public void activate() {
store.setDelegate(delegate);
trackerService.setDelegate(topoDelegate);
batchService.setDelegate(batchDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-%d"));
idGenerator = coreService.getIdGenerator("intent-ids");
......@@ -147,7 +143,6 @@ public class IntentManager
public void deactivate() {
store.unsetDelegate(delegate);
trackerService.unsetDelegate(topoDelegate);
batchService.unsetDelegate(batchDelegate);
eventDispatcher.removeSink(IntentEvent.class);
executor.shutdown();
Intent.unbindIdGenerator(idGenerator);
......@@ -435,11 +430,12 @@ public class IntentManager
}
}
for (ApplicationId appId : batches.keySet()) {
if (batchService.isLocalLeader(appId)) {
execute(batches.get(appId).build());
}
}
//FIXME
// for (ApplicationId appId : batches.keySet()) {
// if (batchService.isLocalLeader(appId)) {
// execute(batches.get(appId).build());
// }
// }
}
// Topology change delegate
......@@ -452,48 +448,21 @@ public class IntentManager
}
// TODO: simplify the branching statements
private IntentUpdate createIntentUpdate(IntentOperation operation) {
switch (operation.type()) {
case SUBMIT:
return new InstallRequest(operation.intent());
case WITHDRAW: {
Intent oldIntent = store.getIntent(operation.intentId());
if (oldIntent == null) {
return new DoNothing();
}
List<Intent> installables = store.getInstallableIntents(oldIntent.id());
if (installables == null) {
return new WithdrawStateChange1(oldIntent);
}
return new WithdrawRequest(oldIntent, installables);
}
case REPLACE: {
Intent newIntent = operation.intent();
Intent oldIntent = store.getIntent(operation.intentId());
if (oldIntent == null) {
return new InstallRequest(newIntent);
}
List<Intent> installables = store.getInstallableIntents(oldIntent.id());
if (installables == null) {
if (newIntent.equals(oldIntent)) {
return new InstallRequest(newIntent);
} else {
return new WithdrawStateChange2(oldIntent);
}
}
return new ReplaceRequest(newIntent, oldIntent, installables);
}
case UPDATE: {
Intent oldIntent = store.getIntent(operation.intentId());
if (oldIntent == null) {
return new DoNothing();
}
List<Intent> installables = getInstallableIntents(oldIntent.id());
if (installables == null) {
return new InstallRequest(oldIntent);
}
return new ReplaceRequest(oldIntent, oldIntent, installables);
}
private IntentUpdate createIntentUpdate(IntentData intentData) {
IntentData currentState = store.getIntentData(intentData.key());
switch (intentData.state()) {
case INSTALL_REQ:
return new InstallRequest(intentData.intent(), currentState);
case WITHDRAW_REQ:
return new WithdrawRequest(intentData.intent(), currentState);
// fallthrough
case COMPILING:
case INSTALLING:
case INSTALLED:
case RECOMPILING:
case WITHDRAWING:
case WITHDRAWN:
case FAILED:
default:
// illegal state
return new DoNothing();
......@@ -504,9 +473,11 @@ public class IntentManager
private class InstallRequest implements IntentUpdate {
private final Intent intent;
private final IntentData currentState;
InstallRequest(Intent intent) {
InstallRequest(Intent intent, IntentData currentState) {
this.intent = checkNotNull(intent);
this.currentState = currentState;
}
@Override
......@@ -518,18 +489,18 @@ public class IntentManager
@Override
public Optional<IntentUpdate> execute() {
return Optional.of(new Compiling(intent));
return Optional.of(new Compiling(intent)); //FIXME
}
}
private class WithdrawRequest implements IntentUpdate {
private final Intent intent;
private final List<Intent> installables;
private final IntentData currentState;
WithdrawRequest(Intent intent, List<Intent> installables) {
WithdrawRequest(Intent intent, IntentData currentState) {
this.intent = checkNotNull(intent);
this.installables = ImmutableList.copyOf(checkNotNull(installables));
this.currentState = currentState;
}
@Override
......@@ -539,7 +510,7 @@ public class IntentManager
@Override
public Optional<IntentUpdate> execute() {
return Optional.of(new Withdrawing(intent, installables));
return Optional.of(new Withdrawing(intent, currentState.installables())); //FIXME
}
}
......@@ -1052,24 +1023,24 @@ public class IntentManager
private static final int TIMEOUT_PER_OP = 500; // ms
protected static final int MAX_ATTEMPTS = 3;
protected final IntentOperations ops;
protected final Collection<IntentData> ops;
// future holding current FlowRuleBatch installation result
protected final long startTime = System.currentTimeMillis();
protected final long endTime;
private IntentBatchPreprocess(IntentOperations ops, long endTime) {
private IntentBatchPreprocess(Collection<IntentData> ops, long endTime) {
this.ops = checkNotNull(ops);
this.endTime = endTime;
}
public IntentBatchPreprocess(IntentOperations ops) {
this(ops, System.currentTimeMillis() + ops.operations().size() * TIMEOUT_PER_OP);
public IntentBatchPreprocess(Collection<IntentData> ops) {
this(ops, System.currentTimeMillis() + ops.size() * TIMEOUT_PER_OP);
}
// FIXME compute reasonable timeouts
protected long calculateTimeoutLimit() {
return System.currentTimeMillis() + ops.operations().size() * TIMEOUT_PER_OP;
return System.currentTimeMillis() + ops.size() * TIMEOUT_PER_OP;
}
@Override
......@@ -1099,12 +1070,13 @@ public class IntentManager
// the batch has failed
// TODO: maybe we should do more?
log.error("Walk the plank, matey...");
batchService.removeIntentOperations(ops);
//FIXME
// batchService.removeIntentOperations(ops);
}
}
private List<IntentUpdate> createIntentUpdates() {
return ops.operations().stream()
return ops.stream()
.map(IntentManager.this::createIntentUpdate)
.collect(Collectors.toList());
}
......@@ -1143,7 +1115,7 @@ public class IntentManager
protected final int installAttempt;
protected Future<CompletedBatchOperation> future;
IntentBatchApplyFirst(IntentOperations operations, List<CompletedIntentUpdate> intentUpdates,
IntentBatchApplyFirst(Collection<IntentData> operations, List<CompletedIntentUpdate> intentUpdates,
long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
super(operations, endTime);
this.intentUpdates = ImmutableList.copyOf(intentUpdates);
......@@ -1202,14 +1174,15 @@ public class IntentManager
// TODO: maybe we should do more?
log.error("Walk the plank, matey...");
future = null;
batchService.removeIntentOperations(ops);
//FIXME
// batchService.removeIntentOperations(ops);
}
}
// TODO: better naming
private class IntentBatchProcessFutures extends IntentBatchApplyFirst {
IntentBatchProcessFutures(IntentOperations operations, List<CompletedIntentUpdate> intentUpdates,
IntentBatchProcessFutures(Collection<IntentData> operations, List<CompletedIntentUpdate> intentUpdates,
long endTime, int installAttempt, Future<CompletedBatchOperation> future) {
super(operations, intentUpdates, endTime, installAttempt, future);
}
......@@ -1228,7 +1201,9 @@ public class IntentManager
Future<CompletedBatchOperation> future = processFutures();
if (future == null) {
// there are no outstanding batches; we are done
batchService.removeIntentOperations(ops);
//FIXME
return; //?
// batchService.removeIntentOperations(ops);
} else if (System.currentTimeMillis() > endTime) {
// - cancel current FlowRuleBatch and resubmit again
retry();
......@@ -1317,17 +1292,10 @@ public class IntentManager
private class InternalBatchDelegate implements IntentBatchDelegate {
@Override
public void execute(IntentOperations operations) {
log.info("Execute {} operation(s).", operations.operations().size());
log.debug("Execute operations: {}", operations.operations());
//FIXME: perhaps we want to track this task so that we can cancel it.
public void execute(Collection<IntentData> operations) {
log.info("Execute {} operation(s).", operations.size());
log.debug("Execute operations: {}", operations);
executor.execute(new IntentBatchPreprocess(operations));
}
@Override
public void cancel(IntentOperations operations) {
//FIXME: implement this
log.warn("NOT IMPLEMENTED -- Cancel operations: {}", operations);
}
}
}
......
......@@ -15,15 +15,11 @@
*/
package org.onosproject.net.intent.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
......@@ -52,21 +48,20 @@ import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.net.resource.LinkResourceAllocations;
import org.onosproject.store.trivial.impl.SimpleIntentStore;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.onlab.util.Tools.delay;
import static org.onosproject.net.intent.IntentState.FAILED;
import static org.onosproject.net.intent.IntentState.INSTALLED;
import static org.onosproject.net.intent.IntentState.WITHDRAWN;
import static org.onosproject.net.intent.IntentState.*;
/**
* Test intent manager and transitions.
......@@ -305,14 +300,16 @@ public class IntentManagerTest {
}
//the batch has not yet been removed when we receive the last event
// FIXME: this doesn't guarantee to avoid the race
for (int tries = 0; tries < 10; tries++) {
if (manager.batchService.getPendingOperations().isEmpty()) {
break;
}
delay(10);
}
assertTrue("There are still pending batch operations.",
manager.batchService.getPendingOperations().isEmpty());
//FIXME
// for (int tries = 0; tries < 10; tries++) {
// if (manager.batchService.getPendingOperations().isEmpty()) {
// break;
// }
// delay(10);
// }
// assertTrue("There are still pending batch operations.",
// manager.batchService.getPendingOperations().isEmpty());
extensionService.unregisterCompiler(MockIntent.class);
extensionService.unregisterInstaller(MockInstallableIntent.class);
......
......@@ -15,8 +15,8 @@
*/
package org.onosproject.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -36,10 +36,10 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.*;
import static org.onosproject.net.intent.IntentState.WITHDRAWN;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
......@@ -51,11 +51,8 @@ public class SimpleIntentStore
private final Logger log = getLogger(getClass());
// current state maps FIXME.. make this a IntentData map
private final Map<IntentId, Intent> intents = new ConcurrentHashMap<>();
private final Map<IntentId, IntentState> states = new ConcurrentHashMap<>();
private final Map<IntentId, List<Intent>> installable = new ConcurrentHashMap<>();
private final Map<String, IntentData> pending = new ConcurrentHashMap<>(); //String is "key"
private final Map<String, IntentData> current = Maps.newConcurrentMap();
private final Map<String, IntentData> pending = Maps.newConcurrentMap(); //String is "key"
@Activate
public void activate() {
......@@ -67,45 +64,32 @@ public class SimpleIntentStore
log.info("Stopped");
}
private void createIntent(Intent intent) {
if (intents.containsKey(intent.id())) {
return;
}
intents.put(intent.id(), intent);
this.setState(intent, IntentState.INSTALL_REQ);
}
private void removeIntent(IntentId intentId) {
checkState(getIntentState(intentId) == WITHDRAWN,
"Intent state for {} is not WITHDRAWN.", intentId);
intents.remove(intentId);
installable.remove(intentId);
states.remove(intentId);
}
@Override
public long getIntentCount() {
return intents.size();
return current.size();
}
@Override
public Iterable<Intent> getIntents() {
return ImmutableSet.copyOf(intents.values());
return current.values().stream()
.map(IntentData::intent)
.collect(Collectors.toList());
}
@Override
public Intent getIntent(IntentId intentId) {
return intents.get(intentId);
throw new UnsupportedOperationException("deprecated");
}
@Override
public IntentState getIntentState(IntentId id) {
return states.get(id);
throw new UnsupportedOperationException("deprecated");
}
private void setState(Intent intent, IntentState state) {
//FIXME
IntentId id = intent.id();
states.put(id, state);
// states.put(id, state);
IntentEvent.Type type = null;
switch (state) {
......@@ -133,16 +117,23 @@ public class SimpleIntentStore
}
private void setInstallableIntents(IntentId intentId, List<Intent> result) {
installable.put(intentId, result);
//FIXME
// installable.put(intentId, result);
}
@Override
public List<Intent> getInstallableIntents(IntentId intentId) {
return installable.get(intentId);
throw new UnsupportedOperationException("deprecated");
}
@Override
public IntentData getIntentData(String key) {
return current.get(key);
}
private void removeInstalledIntents(IntentId intentId) {
installable.remove(intentId);
//FIXME
// installable.remove(intentId);
}
/**
......@@ -165,14 +156,14 @@ public class SimpleIntentStore
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = (Intent) op.args().get(0);
// TODO: what if it failed?
createIntent(intent);
// createIntent(intent); FIXME
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.args().get(0);
removeIntent(intentId);
// removeIntent(intentId); FIXME
break;
case REMOVE_INSTALLED:
......