alshabib
Committed by Gerrit Code Review

fix intent issues yuta observed

Change-Id: I7dc4a19d49a1b3fc18ecce02a4018cbc9a3043fc
......@@ -14,6 +14,7 @@ public interface DemoAPI {
/**
* Installs intents based on the installation type.
* @param type the installation type.
* @param runParams run params
*/
void setup(InstallType type, Optional<JsonNode> runParams);
......
......@@ -199,6 +199,7 @@ public class IntentPushTestCommand extends AbstractShellCommand
/**
* Returns application ID for the CLI.
*
* @param id application id
* @return command-line application identifier
*/
protected ApplicationId appId(Integer id) {
......
......@@ -56,8 +56,8 @@ public interface IntentBatchService {
* Return true if this instance is the local leader for batch
* processing a given application id.
*
* @param applicationId
* @return
* @param applicationId an application id
* @return true if this instance is the local leader for batch
*/
boolean isLocalLeader(ApplicationId applicationId);
......
......@@ -67,4 +67,32 @@ public class IntentEvent extends AbstractEvent<IntentEvent.Type, Intent> {
super(type, intent);
}
public static IntentEvent getEvent(IntentState state, Intent intent) {
Type type;
switch (state) {
case SUBMITTED:
type = Type.SUBMITTED;
break;
case INSTALLED:
type = Type.INSTALLED;
break;
case WITHDRAWN:
type = Type.WITHDRAWN;
break;
case FAILED:
type = Type.FAILED;
break;
//fallthrough to default from here
case COMPILING:
case INSTALLING:
case RECOMPILING:
case WITHDRAWING:
default:
throw new IllegalArgumentException(
"Intent event cannot have transient state: " + state);
}
return new IntentEvent(type, intent);
}
}
......
......@@ -15,7 +15,6 @@
*/
package org.onlab.onos.net.intent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
......@@ -42,16 +41,16 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
* mechanism.
*
* @param intent intent to be submitted
* @return event indicating the intent was submitted or null if no
* change resulted, e.g. duplicate intent
*/
IntentEvent createIntent(Intent intent);
@Deprecated
void createIntent(Intent intent);
/**
* Removes the specified intent from the inventory.
*
* @param intentId intent identification
*/
@Deprecated
void removeIntent(IntentId intentId);
/**
......@@ -89,9 +88,8 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
*
* @param intent intent whose state is to be changed
* @param newState new state
* @return state transition event
*/
IntentEvent setState(Intent intent, IntentState newState);
void setState(Intent intent, IntentState newState);
/**
* Sets the installable intents which resulted from compilation of the
......@@ -129,64 +127,13 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
return new BatchWrite();
}
// default implementation simply executes them sequentially.
// Store implementation should override and implement actual batch write.
/**
* Execute writes in a batch.
*
* @param batch BatchWrite to execute
* @return failed operations
*/
default List<Operation> batchWrite(BatchWrite batch) {
List<Operation> failed = new ArrayList<>();
for (Operation op : batch.operations) {
switch (op.type) {
case CREATE_INTENT:
checkArgument(op.args.size() == 1,
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = (Intent) op.args.get(0);
if (createIntent(intent) == null) {
failed.add(op);
}
break;
case REMOVE_INTENT:
checkArgument(op.args.size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.args.get(0);
removeIntent(intentId);
break;
case REMOVE_INSTALLED:
checkArgument(op.args.size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = (IntentId) op.args.get(0);
removeInstalledIntents(intentId);
break;
case SET_INSTALLABLE:
checkArgument(op.args.size() == 2,
"SET_INSTALLABLE takes 2 arguments. %s", op);
intentId = (IntentId) op.args.get(0);
@SuppressWarnings("unchecked")
List<Intent> installableIntents = (List<Intent>) op.args.get(1);
setInstallableIntents(intentId, installableIntents);
break;
case SET_STATE:
checkArgument(op.args.size() == 2,
"SET_STATE takes 2 arguments. %s", op);
intent = (Intent) op.args.get(0);
IntentState newState = (IntentState) op.args.get(1);
setState(intent, newState);
break;
default:
break;
}
}
return failed;
}
List<Operation> batchWrite(BatchWrite batch);
public static class BatchWrite {
......
......@@ -128,7 +128,7 @@ public class IntentManager
trackerService.setDelegate(topoDelegate);
batchService.setDelegate(batchDelegate);
eventDispatcher.addSink(IntentEvent.class, listenerRegistry);
executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent-monitor"));
executor = newFixedThreadPool(NUM_THREADS, namedThreads("onos-intent"));
idGenerator = coreService.getIdGenerator("intent-ids");
Intent.bindIdGenerator(idGenerator);
log.info("Started");
......@@ -646,12 +646,11 @@ public class IntentManager
return !isComplete() ? batches.get(currentBatch) : null;
}
List<IntentEvent> batchSuccess(BatchWrite batchWrite) {
void batchSuccess(BatchWrite batchWrite) {
// move on to next Batch
if (++currentBatch == batches.size()) {
return finalizeStates(batchWrite);
finalizeStates(batchWrite);
}
return Collections.emptyList();
}
void batchFailed() {
......@@ -673,19 +672,16 @@ public class IntentManager
}
// FIXME make sure this is called!!!
private List<IntentEvent> finalizeStates(BatchWrite batchWrite) {
private void finalizeStates(BatchWrite batchWrite) {
// events to be triggered on successful write
List<IntentEvent> events = new ArrayList<>();
for (Intent intent : stateMap.keySet()) {
switch (getInflightState(intent)) {
case INSTALLING:
batchWrite.setState(intent, INSTALLED);
batchWrite.setInstallableIntents(newIntent.id(), newInstallables);
events.add(new IntentEvent(Type.INSTALLED, intent));
break;
case WITHDRAWING:
batchWrite.setState(intent, WITHDRAWN);
events.add(new IntentEvent(Type.WITHDRAWN, intent));
batchWrite.removeInstalledIntents(intent.id());
batchWrite.removeIntent(intent.id());
break;
......@@ -705,7 +701,6 @@ public class IntentManager
break;
}
}
return events;
}
List<FlowRuleBatchOperation> batches() {
......@@ -737,10 +732,10 @@ public class IntentManager
intent.id(), oldState, newState);
stateMap.put(intent, newState);
IntentEvent event = store.setState(intent, newState);
if (event != null) {
eventDispatcher.post(event);
}
// IntentEvent event = store.setState(intent, newState);
// if (event != null) {
// eventDispatcher.post(event);
// }
}
Map<Intent, IntentState> stateMap() {
......@@ -822,7 +817,7 @@ public class IntentManager
BatchWrite batchWrite = store.newBatchWrite();
List<IntentEvent> events = new ArrayList<>();
for (IntentUpdate update : intentUpdates) {
events.addAll(update.batchSuccess(batchWrite));
update.batchSuccess(batchWrite);
}
if (!batchWrite.isEmpty()) {
store.batchWrite(batchWrite);
......
......@@ -5,6 +5,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
......@@ -40,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
......@@ -229,7 +231,8 @@ public class IntentManagerTest {
public void await(IntentEvent.Type type) {
try {
latchMap.get(type).await();
assertTrue("Timed out waiting for: " + type,
latchMap.get(type).await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
......
......@@ -18,6 +18,7 @@ package org.onlab.onos.store.hz;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import com.hazelcast.monitor.LocalQueueStats;
......@@ -201,16 +202,39 @@ public class SQueue<T> implements IQueue<T> {
return q.getLocalQueueStats();
}
@Deprecated // not implemented yet
@Override
public String addItemListener(ItemListener<T> itemListener, boolean b) {
throw new UnsupportedOperationException();
public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
ItemListener<byte[]> il = new ItemListener<byte[]>() {
@Override
public void itemAdded(ItemEvent<byte[]> item) {
itemListener.itemAdded(new ItemEvent<T>(getName(item),
item.getEventType(),
deserialize(item.getItem()),
item.getMember()));
}
@Override
public void itemRemoved(ItemEvent<byte[]> item) {
itemListener.itemRemoved(new ItemEvent<T>(getName(item),
item.getEventType(),
deserialize(item.getItem()),
item.getMember()));
}
private String getName(ItemEvent<byte[]> item) {
return (item.getSource() instanceof String) ?
(String) item.getSource() : item.getSource().toString();
}
};
return q.addItemListener(il, withValue);
}
@Deprecated // not implemented yet
@Override
public boolean removeItemListener(String s) {
throw new UnsupportedOperationException();
public boolean removeItemListener(String registrationId) {
return q.removeItemListener(registrationId);
}
@Deprecated
......
......@@ -23,6 +23,7 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -187,15 +188,16 @@ public class DistributedIntentStore
}
@Override
public IntentEvent createIntent(Intent intent) {
public void createIntent(Intent intent) {
Context timer = startTimer(createIntentTimer);
try {
boolean absent = intents.putIfAbsent(intent.id(), intent);
if (!absent) {
// duplicate, ignore
return null;
return;
} else {
return this.setState(intent, IntentState.SUBMITTED);
this.setState(intent, IntentState.SUBMITTED);
return;
}
} finally {
stopTimer(timer);
......@@ -273,7 +275,7 @@ public class DistributedIntentStore
}
@Override
public IntentEvent setState(Intent intent, IntentState state) {
public void setState(Intent intent, IntentState state) {
Context timer = startTimer(setStateTimer);
try {
final IntentId id = intent.id();
......@@ -341,10 +343,10 @@ public class DistributedIntentStore
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
}
if (evtType == null) {
return null;
if (evtType != null) {
notifyDelegate(new IntentEvent(evtType, intent));
}
return new IntentEvent(evtType, intent);
return;
} finally {
stopTimer(timer);
}
......@@ -417,6 +419,7 @@ public class DistributedIntentStore
List<Operation> failed = new ArrayList<>();
final Builder builder = BatchWriteRequest.newBuilder();
List<IntentEvent> events = Lists.newArrayList();
final Set<IntentId> transitionedToParking = new HashSet<>();
......@@ -428,6 +431,7 @@ public class DistributedIntentStore
Intent intent = op.arg(0);
builder.putIfAbsent(INTENTS_TABLE, strIntentId(intent.id()), serializer.encode(intent));
builder.putIfAbsent(STATES_TABLE, strIntentId(intent.id()), serializer.encode(SUBMITTED));
events.add(IntentEvent.getEvent(SUBMITTED, intent));
break;
case REMOVE_INTENT:
......@@ -450,6 +454,7 @@ public class DistributedIntentStore
} else {
transitionedToParking.remove(intent.id());
}
events.add(IntentEvent.getEvent(newState, intent));
break;
case SET_INSTALLABLE:
......@@ -478,9 +483,11 @@ public class DistributedIntentStore
if (batchWriteResult.isSuccessful()) {
// no-failure (except for invalid input)
transitionedToParking.forEach((intentId) -> transientStates.remove(intentId));
notifyDelegate(events);
return failed;
} else {
// everything failed
// FIXME what to do with events?
return batch.operations();
}
}
......
......@@ -19,6 +19,9 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemEvent;
import com.hazelcast.core.ItemListener;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -47,7 +50,6 @@ import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -107,6 +109,9 @@ public class HazelcastIntentBatchQueue
@Deactivate
public void deactivate() {
leadershipService.removeListener(leaderListener);
for (ApplicationId appId: batchQueues.keySet()) {
leadershipService.withdraw(getTopic(appId));
}
log.info("Stopped");
}
......@@ -125,12 +130,11 @@ public class HazelcastIntentBatchQueue
SQueue<IntentOperations> queue = batchQueues.get(appId);
if (queue == null) {
synchronized (this) {
// FIXME how will other instances find out about new queues
String topic = getTopic(appId);
IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
queue = new SQueue<>(rawQueue, serializer);
queue.addItemListener(new InternalItemListener(appId), false);
batchQueues.putIfAbsent(appId, queue);
// TODO others should run for leadership when they hear about this topic
leadershipService.runForLeadership(topic);
}
}
......@@ -209,6 +213,25 @@ public class HazelcastIntentBatchQueue
}
}
private class InternalItemListener implements ItemListener<IntentOperations> {
private final ApplicationId appId;
public InternalItemListener(ApplicationId appId) {
this.appId = appId;
}
@Override
public void itemAdded(ItemEvent<IntentOperations> item) {
dispatchNextOperation(appId);
}
@Override
public void itemRemoved(ItemEvent<IntentOperations> item) {
// no-op
}
}
private class InternalLeaderListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
......@@ -220,6 +243,8 @@ public class HazelcastIntentBatchQueue
return; // Not our topic: ignore
}
if (!event.subject().leader().id().equals(localControllerNode.id())) {
// run for leadership
getQueue(getAppId(topic));
return; // The event is not about this instance: ignore
}
......
......@@ -20,6 +20,7 @@ import com.codahale.metrics.Timer.Context;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
......@@ -171,15 +172,16 @@ public class HazelcastIntentStore
}
@Override
public IntentEvent createIntent(Intent intent) {
public void createIntent(Intent intent) {
Context timer = startTimer(createIntentTimer);
try {
Intent existing = intents.putIfAbsent(intent.id(), intent);
if (existing != null) {
// duplicate, ignore
return null;
return;
} else {
return this.setState(intent, IntentState.SUBMITTED);
this.setState(intent, IntentState.SUBMITTED);
return;
}
} finally {
stopTimer(timer);
......@@ -256,7 +258,7 @@ public class HazelcastIntentStore
}
@Override
public IntentEvent setState(Intent intent, IntentState state) {
public void setState(Intent intent, IntentState state) {
Context timer = startTimer(setStateTimer);
try {
......@@ -311,10 +313,10 @@ public class HazelcastIntentStore
final IntentState prevTransient = transientStates.put(id, state);
log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
if (type == null) {
return null;
if (type != null) {
notifyDelegate(new IntentEvent(type, intent));
}
return new IntentEvent(type, intent);
return;
} finally {
stopTimer(timer);
}
......@@ -358,6 +360,7 @@ public class HazelcastIntentStore
List<Operation> failed = new ArrayList<>();
List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
List<IntentEvent> events = Lists.newArrayList();
for (Operation op : batch.operations()) {
switch (op.type()) {
......@@ -434,6 +437,7 @@ public class HazelcastIntentStore
prevIntent, prevIntentState,
intent, newIntentState);
}
events.add(IntentEvent.getEvent(SUBMITTED, intent));
} catch (InterruptedException e) {
log.error("Batch write was interrupted while processing {}", op, e);
failed.add(op);
......@@ -487,6 +491,8 @@ public class HazelcastIntentStore
if (PARKING.contains(newState)) {
transientStates.remove(intentId);
}
events.add(IntentEvent.getEvent(newState, intent));
log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
// TODO sanity check and log?
} catch (InterruptedException e) {
......@@ -554,6 +560,9 @@ public class HazelcastIntentStore
break;
}
}
notifyDelegate(events);
return failed;
}
......@@ -571,6 +580,8 @@ public class HazelcastIntentStore
log.debug("{} state updated remotely, removing transient state {}",
intentId, oldState);
}
notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId)));
}
}
}
......
......@@ -16,6 +16,8 @@
package org.onlab.onos.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -26,6 +28,7 @@ import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.IntentStore;
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
......@@ -33,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -60,12 +64,13 @@ public class SimpleIntentStore
}
@Override
public IntentEvent createIntent(Intent intent) {
public void createIntent(Intent intent) {
if (intents.containsKey(intent.id())) {
return null;
return;
}
intents.put(intent.id(), intent);
return this.setState(intent, IntentState.SUBMITTED);
this.setState(intent, IntentState.SUBMITTED);
return;
}
@Override
......@@ -98,7 +103,7 @@ public class SimpleIntentStore
}
@Override
public IntentEvent setState(Intent intent, IntentState state) {
public void setState(Intent intent, IntentState state) {
IntentId id = intent.id();
states.put(id, state);
IntentEvent.Type type = null;
......@@ -119,10 +124,9 @@ public class SimpleIntentStore
default:
break;
}
if (type == null) {
return null;
if (type != null) {
notifyDelegate(new IntentEvent(type, intent));
}
return new IntentEvent(type, intent);
}
@Override
......@@ -139,5 +143,60 @@ public class SimpleIntentStore
public void removeInstalledIntents(IntentId intentId) {
installable.remove(intentId);
}
/**
* Execute writes in a batch.
*
* @param batch BatchWrite to execute
* @return failed operations
*/
@Override
public List<Operation> batchWrite(BatchWrite batch) {
List<Operation> failed = Lists.newArrayList();
for (Operation op : batch.operations()) {
switch (op.type()) {
case CREATE_INTENT:
checkArgument(op.args().size() == 1,
"CREATE_INTENT takes 1 argument. %s", op);
Intent intent = (Intent) op.args().get(0);
// TODO: what if it failed?
createIntent(intent);
break;
case REMOVE_INTENT:
checkArgument(op.args().size() == 1,
"REMOVE_INTENT takes 1 argument. %s", op);
IntentId intentId = (IntentId) op.args().get(0);
removeIntent(intentId);
break;
case REMOVE_INSTALLED:
checkArgument(op.args().size() == 1,
"REMOVE_INSTALLED takes 1 argument. %s", op);
intentId = (IntentId) op.args().get(0);
removeInstalledIntents(intentId);
break;
case SET_INSTALLABLE:
checkArgument(op.args().size() == 2,
"SET_INSTALLABLE takes 2 arguments. %s", op);
intentId = (IntentId) op.args().get(0);
@SuppressWarnings("unchecked")
List<Intent> installableIntents = (List<Intent>) op.args().get(1);
setInstallableIntents(intentId, installableIntents);
break;
case SET_STATE:
checkArgument(op.args().size() == 2,
"SET_STATE takes 2 arguments. %s", op);
intent = (Intent) op.args().get(0);
IntentState newState = (IntentState) op.args().get(1);
setState(intent, newState);
break;
default:
break;
}
}
return failed;
}
}
......
......@@ -19,7 +19,7 @@ import org.onlab.onos.cluster.LeadershipService;
/**
* A trivial implementation of the leadership service.
* <p></p>
* <p>
* The service is not distributed, so it can assume there's a single leadership
* contender. This contender is always granted leadership whenever it asks.
*/
......