Brian O'Connor
Committed by Gerrit Code Review

Initial implementation of distributed intent batch queue

Change-Id: I7ffed03651569ade1be1e8dca905bfaf369b7e03
......@@ -308,7 +308,7 @@ public class DemoInstaller implements DemoAPI {
}
private void installIntents(List<HostPair> toInstall) {
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId);
for (HostPair pair : toInstall) {
installed.add(pair);
uninstalledOrWithdrawn.remove(pair);
......@@ -318,7 +318,7 @@ public class DemoInstaller implements DemoAPI {
}
private void uninstallIntents(Collection<HostPair> toRemove) {
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId);
for (HostPair pair : toRemove) {
installed.remove(pair);
uninstalledOrWithdrawn.add(pair);
......@@ -333,7 +333,7 @@ public class DemoInstaller implements DemoAPI {
private void cleanUp() {
List<HostPair> allPairs = Lists.newArrayList(installed);
allPairs.addAll(uninstalledOrWithdrawn);
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId);
for (HostPair pair : allPairs) {
builder.addWithdrawOperation(pair.h2hIntent().id());
}
......
......@@ -129,7 +129,7 @@ public class OpticalPathProvisioner {
}
// Build the intent batch
IntentOperations.Builder ops = IntentOperations.builder();
IntentOperations.Builder ops = IntentOperations.builder(appId);
for (Intent i : intents) {
// TODO: don't allow duplicate intents between the same points for now
// we may want to allow this carefully in future to increase capacity
......
......@@ -221,7 +221,7 @@ public class IntentSynchronizer {
// Push the intents
if (isElectedLeader && isActivatedLeader) {
log.debug("SDN-IP Submitting all Peer Intents...");
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId);
for (Intent intent : intents) {
builder.addSubmitOperation(intent);
}
......@@ -370,7 +370,7 @@ public class IntentSynchronizer {
}
// Withdraw Intents
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId);
for (Intent intent : deleteIntents) {
builder.addWithdrawOperation(intent.id());
log.debug("SDN-IP Intent Synchronizer: withdrawing intent: {}",
......@@ -386,7 +386,7 @@ public class IntentSynchronizer {
intentService.execute(intentOperations);
// Add Intents
builder = IntentOperations.builder();
builder = IntentOperations.builder(appId);
for (Intent intent : addIntents) {
builder.addSubmitOperation(intent);
log.debug("SDN-IP Intent Synchronizer: submitting intent: {}",
......
......@@ -325,13 +325,13 @@ public class IntentSyncTest extends AbstractIntentTest {
.andReturn(IntentState.WITHDRAWING).anyTimes();
expect(intentService.getIntents()).andReturn(intents).anyTimes();
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(null); //FIXME null
builder.addWithdrawOperation(intent2.id());
builder.addWithdrawOperation(intent4.id());
intentService.execute(TestIntentServiceHelper.eqExceptId(
builder.build()));
builder = IntentOperations.builder();
builder = IntentOperations.builder(null); //FIXME null
builder.addSubmitOperation(intent3);
builder.addSubmitOperation(intent4Update);
builder.addSubmitOperation(intent6);
......
......@@ -566,7 +566,7 @@ public class PeerConnectivityManagerTest extends AbstractIntentTest {
reset(intentService);
// Setup the expected intents
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(null); //FIXME null
for (Intent intent : intentList) {
builder.addSubmitOperation(intent);
}
......@@ -601,7 +601,7 @@ public class PeerConnectivityManagerTest extends AbstractIntentTest {
replay(configInfoService);
reset(intentService);
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(null); //FIXME null
intentService.execute(TestIntentServiceHelper.eqExceptId(
builder.build()));
replay(intentService);
......@@ -627,7 +627,7 @@ public class PeerConnectivityManagerTest extends AbstractIntentTest {
replay(configInfoService);
reset(intentService);
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(null); //FIXME null
intentService.execute(TestIntentServiceHelper.eqExceptId(
builder.build()));
replay(intentService);
......
......@@ -119,7 +119,7 @@ public class IntentPushTestCommand extends AbstractShellCommand
}
private void submitIntents(List<Intent> intents) {
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId());
for (Intent intent : intents) {
if (add) {
builder.addSubmitOperation(intent);
......@@ -132,7 +132,7 @@ public class IntentPushTestCommand extends AbstractShellCommand
start = System.currentTimeMillis();
service.execute(ops);
try {
if (latch.await(10, TimeUnit.SECONDS)) {
if (latch.await(30, TimeUnit.SECONDS)) {
printResults(count);
} else {
print("Failure: %d intents not installed", latch.getCount());
......
......@@ -86,7 +86,7 @@ public class RandomIntentCommand extends AbstractShellCommand {
}
private void submitIntents(Collection<Intent> intents) {
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId());
for (Intent intent : intents) {
builder.addSubmitOperation(intent);
}
......@@ -95,7 +95,7 @@ public class RandomIntentCommand extends AbstractShellCommand {
}
private void withdrawIntents() {
IntentOperations.Builder builder = IntentOperations.builder();
IntentOperations.Builder builder = IntentOperations.builder(appId());
for (Intent intent : service.getIntents()) {
if (appId().equals(intent.appId())) {
builder.addWithdrawOperation(intent.id());
......
......@@ -15,6 +15,8 @@
*/
package org.onlab.onos.net.intent;
import org.onlab.onos.core.ApplicationId;
import java.util.Set;
/**
......@@ -46,9 +48,20 @@ public interface IntentBatchService {
* Returns the set of intent batches currently being processed.
* @return set of batches
*/
//TODO we may want to get rid of this method
@Deprecated
Set<IntentOperations> getCurrentOperations();
/**
* Return true if this instance is the local leader for batch
* processing a given application id.
*
* @param applicationId
* @return
*/
boolean isLocalLeader(ApplicationId applicationId);
/**
* Sets the batch service delegate.
*
* @param delegate delegate to apply
......
......@@ -19,6 +19,7 @@ import java.util.List;
import java.util.Objects;
import com.google.common.collect.ImmutableList;
import org.onlab.onos.core.ApplicationId;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -33,14 +34,16 @@ import static org.onlab.onos.net.intent.IntentOperation.Type.WITHDRAW;
public final class IntentOperations {
private final List<IntentOperation> operations;
private final ApplicationId appId;
/**
* Creates a batch of intent operations using the supplied list.
*
* @param operations list of intent operations
*/
private IntentOperations(List<IntentOperation> operations) {
private IntentOperations(List<IntentOperation> operations, ApplicationId appId) {
this.operations = operations;
this.appId = appId;
}
/**
......@@ -52,16 +55,20 @@ public final class IntentOperations {
return operations;
}
public ApplicationId appId() {
return appId;
}
/**
* Returns a builder for intent operation batches.
*
* @return intent operations builder
* @param applicationId application id
*/
public static Builder builder() {
return new Builder();
public static Builder builder(ApplicationId applicationId) {
return new Builder(applicationId);
}
@Override
public int hashCode() {
return Objects.hash(operations);
......@@ -92,9 +99,11 @@ public final class IntentOperations {
public static final class Builder {
private final ImmutableList.Builder<IntentOperation> builder = ImmutableList.builder();
private final ApplicationId appId;
// Public construction is forbidden.
private Builder() {
private Builder(ApplicationId appId) {
this.appId = appId;
}
/**
......@@ -153,7 +162,7 @@ public final class IntentOperations {
* @return immutable batch of intent operations
*/
public IntentOperations build() {
return new IntentOperations(builder.build());
return new IntentOperations(builder.build(), appId);
}
}
......
......@@ -78,15 +78,15 @@ public class IntentOperationsTest {
@Test
public void testEquals() {
final IntentOperations operations1 =
IntentOperations.builder()
IntentOperations.builder(null) //FIXME null
.addSubmitOperation(intent)
.build();
final IntentOperations sameAsOperations1 =
IntentOperations.builder()
IntentOperations.builder(null) //FIXME null
.addSubmitOperation(intent)
.build();
final IntentOperations operations2 =
IntentOperations.builder()
IntentOperations.builder(null) //FIXME null
.addReplaceOperation(intent.id(), intent)
.build();
......@@ -102,7 +102,7 @@ public class IntentOperationsTest {
@Test
public void testConstruction() {
final IntentOperations operations =
IntentOperations.builder()
IntentOperations.builder(null) //FIXME
.addUpdateOperation(intent.id())
.addWithdrawOperation(intent.id())
.build();
......
......@@ -33,6 +33,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.core.IdGenerator;
import org.onlab.onos.event.AbstractListenerRegistry;
......@@ -151,20 +152,22 @@ public class IntentManager
@Override
public void submit(Intent intent) {
checkNotNull(intent, INTENT_NULL);
execute(IntentOperations.builder().addSubmitOperation(intent).build());
execute(IntentOperations.builder(intent.appId())
.addSubmitOperation(intent).build());
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
execute(IntentOperations.builder().addWithdrawOperation(intent.id()).build());
execute(IntentOperations.builder(intent.appId())
.addWithdrawOperation(intent.id()).build());
}
@Override
public void replace(IntentId oldIntentId, Intent newIntent) {
checkNotNull(oldIntentId, INTENT_ID_NULL);
checkNotNull(newIntent, INTENT_NULL);
execute(IntentOperations.builder()
execute(IntentOperations.builder(newIntent.appId())
.addReplaceOperation(oldIntentId, newIntent)
.build());
}
......@@ -489,14 +492,20 @@ public class IntentManager
}
}
// Topology change delegate
private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
@Override
public void triggerCompile(Iterable<IntentId> intentIds,
private void buildAndSubmitBatches(Iterable<IntentId> intentIds,
boolean compileAllFailed) {
Map<ApplicationId, IntentOperations.Builder> batches = Maps.newHashMap();
// Attempt recompilation of the specified intents first.
IntentOperations.Builder builder = IntentOperations.builder();
for (IntentId id : intentIds) {
Intent intent = store.getIntent(id);
if (intent == null) {
continue;
}
IntentOperations.Builder builder = batches.get(intent.appId());
if (builder == null) {
builder = IntentOperations.builder(intent.appId());
batches.put(intent.appId(), builder);
}
builder.addUpdateOperation(id);
}
......@@ -504,11 +513,29 @@ public class IntentManager
// If required, compile all currently failed intents.
for (Intent intent : getIntents()) {
if (getIntentState(intent.id()) == FAILED) {
IntentOperations.Builder builder = batches.get(intent.appId());
if (builder == null) {
builder = IntentOperations.builder(intent.appId());
batches.put(intent.appId(), builder);
}
builder.addUpdateOperation(intent.id());
}
}
}
execute(builder.build());
for (ApplicationId appId : batches.keySet()) {
if (batchService.isLocalLeader(appId)) {
execute(batches.get(appId).build());
}
}
}
// Topology change delegate
private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
@Override
public void triggerCompile(Iterable<IntentId> intentIds,
boolean compileAllFailed) {
buildAndSubmitBatches(intentIds, compileAllFailed);
}
}
......
......@@ -216,5 +216,8 @@ public class ObjectiveTracker implements ObjectiveTrackerService {
}
}
//TODO consider adding flow rule event tracking
//FIXME the only intents that will be tracked are events that were
//executed on this instance. Need to have some backup trackers...
}
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.hz;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.ItemListener;
import com.hazelcast.monitor.LocalQueueStats;
import org.onlab.onos.store.serializers.StoreSerializer;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
// TODO: implementation is incomplete
/**
* Wrapper around IQueue&lt;byte[]&gt; which serializes/deserializes
* key and value using StoreSerializer.
*
* @param <T> type
*/
public class SQueue<T> implements IQueue<T> {
private final IQueue<byte[]> q;
private final StoreSerializer serializer;
/**
* Creates a SQueue instance.
*
* @param baseQueue base IQueue to use
* @param serializer serializer to use for both key and value
*/
public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
this.q = checkNotNull(baseQueue);
this.serializer = checkNotNull(serializer);
}
private byte[] serialize(Object key) {
return serializer.encode(key);
}
private T deserialize(byte[] key) {
return serializer.decode(key);
}
@Override
public boolean add(T t) {
return q.add(serialize(t));
}
@Override
public boolean offer(T t) {
return q.offer(serialize(t));
}
@Override
public void put(T t) throws InterruptedException {
q.put(serialize(t));
}
@Override
public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
return q.offer(serialize(t), l, timeUnit);
}
@Override
public T take() throws InterruptedException {
return deserialize(q.take());
}
@Override
public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
return deserialize(q.poll(l, timeUnit));
}
@Override
public int remainingCapacity() {
return q.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return q.remove(serialize(o));
}
@Override
public boolean contains(Object o) {
return q.contains(serialize(o));
}
@Override
public int drainTo(Collection<? super T> collection) {
throw new UnsupportedOperationException();
}
@Override
public int drainTo(Collection<? super T> collection, int i) {
throw new UnsupportedOperationException();
}
@Override
public T remove() {
return deserialize(q.remove());
}
@Override
public T poll() {
return deserialize(q.poll());
}
@Override
public T element() {
return deserialize(q.element());
}
@Override
public T peek() {
return deserialize(q.peek());
}
@Override
public int size() {
return q.size();
}
@Override
public boolean isEmpty() {
return q.isEmpty();
}
@Override
public Iterator<T> iterator() {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public <T1> T1[] toArray(T1[] t1s) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Override
public boolean addAll(Collection<? extends T> collection) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> collection) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
q.clear();
}
@Override
public LocalQueueStats getLocalQueueStats() {
return q.getLocalQueueStats();
}
@Override
public String addItemListener(ItemListener<T> itemListener, boolean b) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeItemListener(String s) {
throw new UnsupportedOperationException();
}
@Deprecated
@Override
public Object getId() {
return q.getId();
}
@Override
public String getPartitionKey() {
return q.getPartitionKey();
}
@Override
public String getName() {
return q.getName();
}
@Override
public String getServiceName() {
return q.getServiceName();
}
@Override
public void destroy() {
q.destroy();
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.intent.IntentBatchDelegate;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
// FIXME This is not distributed yet.
@Component(immediate = true)
@Service
public class DistributedIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
synchronized (this) {
pendingBatches.add(operations);
if (currentBatches.isEmpty()) {
IntentOperations work = pendingBatches.poll();
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public void removeIntentOperations(IntentOperations operations) {
// we allow at most one outstanding batch at a time
synchronized (this) {
checkState(currentBatches.remove(operations), "Operations not found in current ops.");
checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
IntentOperations work = pendingBatches.poll();
if (work != null) {
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public Set<IntentOperations> getPendingOperations() {
synchronized (this) {
return Sets.newHashSet(pendingBatches);
}
}
@Override
public Set<IntentOperations> getCurrentOperations() {
synchronized (this) {
return Sets.newHashSet(currentBatches);
}
}
@Override
public void setDelegate(IntentBatchDelegate delegate) {
this.delegate = checkNotNull(delegate, "Delegate cannot be null");
}
@Override
public void unsetDelegate(IntentBatchDelegate delegate) {
if (this.delegate != null && this.delegate.equals(delegate)) {
this.delegate = null;
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.core.CoreService;
import org.onlab.onos.net.intent.IntentBatchDelegate;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.store.hz.SQueue;
import org.onlab.onos.store.hz.StoreService;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.StoreSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class HazelcastIntentBatchQueue
implements IntentBatchService {
private final Logger log = getLogger(getClass());
private static final String TOPIC_BASE = "intent-batch-";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
private HazelcastInstance theInstance;
private ControllerNode localControllerNode;
protected StoreSerializer serializer;
private IntentBatchDelegate delegate;
private InternalLeaderListener leaderListener = new InternalLeaderListener();
private final Map<ApplicationId, SQueue<IntentOperations>> batchQueues
= Maps.newHashMap(); // FIXME make distributed?
private final Set<ApplicationId> myTopics = Sets.newHashSet();
private final Map<ApplicationId, IntentOperations> outstandingOps
= Maps.newHashMap();
@Activate
public void activate() {
theInstance = storeService.getHazelcastInstance();
localControllerNode = clusterService.getLocalNode();
leadershipService.addListener(leaderListener);
serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.setRegistrationRequired(false)
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.build();
}
};
log.info("Started");
}
@Deactivate
public void deactivate() {
leadershipService.removeListener(leaderListener);
log.info("Stopped");
}
public static String getTopic(ApplicationId appId) {
return TOPIC_BASE + checkNotNull(appId.id());
}
public ApplicationId getAppId(String topic) {
checkState(topic.startsWith(TOPIC_BASE),
"Trying to get app id for invalid topic: {}", topic);
Short id = Short.parseShort(topic.substring(TOPIC_BASE.length()));
return coreService.getAppId(id);
}
private SQueue<IntentOperations> getQueue(ApplicationId appId) {
SQueue<IntentOperations> queue = batchQueues.get(appId);
if (queue == null) {
synchronized (this) {
// FIXME how will other instances find out about new queues
String topic = getTopic(appId);
IQueue<byte[]> rawQueue = theInstance.getQueue(topic);
queue = new SQueue<>(rawQueue, serializer);
batchQueues.putIfAbsent(appId, queue);
// TODO others should run for leadership when they hear about this topic
leadershipService.runForLeadership(topic);
}
}
return queue;
}
@Override
public void addIntentOperations(IntentOperations ops) {
checkNotNull(ops, "Intent operations cannot be null.");
ApplicationId appId = ops.appId();
getQueue(appId).add(ops); // TODO consider using put here
dispatchNextOperation(appId);
}
@Override
public void removeIntentOperations(IntentOperations ops) {
ApplicationId appId = ops.appId();
synchronized (this) {
checkState(outstandingOps.get(appId).equals(ops),
"Operations not found.");
SQueue<IntentOperations> queue = batchQueues.get(appId);
// TODO consider alternatives to remove
checkState(queue.remove().equals(ops),
"Operations are wrong.");
outstandingOps.remove(appId);
dispatchNextOperation(appId);
}
}
/**
* Dispatches the next available operations to the delegate, unless
* we are not the leader for this application id or there is an
* outstanding operations for this application id.
*
* @param appId application id
*/
private void dispatchNextOperation(ApplicationId appId) {
synchronized (this) {
if (!myTopics.contains(appId) ||
outstandingOps.containsKey(appId)) {
return;
}
IntentOperations ops = batchQueues.get(appId).peek();
if (ops != null) {
outstandingOps.put(appId, ops);
delegate.execute(ops);
}
}
}
/**
* Record the leadership change for the given topic. If we have become the
* leader, then dispatch the next operations. If we have lost leadership,
* then cancel the last operations.
*
* @param topic topic based on application id
* @param leader true if we have become the leader, false otherwise
*/
private void leaderChanged(String topic, boolean leader) {
ApplicationId appId = getAppId(topic);
//TODO we are using the event caller's thread, should we use our own?
synchronized (this) {
if (leader) {
myTopics.add(appId);
checkState(!outstandingOps.containsKey(appId),
"Existing intent ops for app id: {}", appId);
dispatchNextOperation(appId);
} else {
myTopics.remove(appId);
IntentOperations ops = outstandingOps.get(appId);
if (ops != null) {
delegate.cancel(ops);
}
outstandingOps.remove(appId);
}
}
}
private class InternalLeaderListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
log.debug("Leadership Event: time = {} type = {} event = {}",
event.time(), event.type(), event);
String topic = event.subject().topic();
if (!topic.startsWith(TOPIC_BASE)) {
return; // Not our topic: ignore
}
if (!event.subject().leader().id().equals(localControllerNode.id())) {
return; // The event is not about this instance: ignore
}
switch (event.type()) {
case LEADER_ELECTED:
log.info("Elected leader for app {}", getAppId(topic));
leaderChanged(topic, true);
break;
case LEADER_BOOTED:
log.info("Lost leader election for app {}", getAppId(topic));
leaderChanged(topic, false);
break;
case LEADER_REELECTED:
break;
default:
break;
}
}
}
@Override
public Set<IntentOperations> getPendingOperations() {
Set<IntentOperations> ops = Sets.newHashSet();
synchronized (this) {
for (SQueue<IntentOperations> queue : batchQueues.values()) {
ops.addAll(queue);
}
return ops;
}
}
@Override
public Set<IntentOperations> getCurrentOperations() {
//FIXME this is not really implemented
return Collections.emptySet();
}
@Override
public boolean isLocalLeader(ApplicationId applicationId) {
return myTopics.contains(applicationId);
}
@Override
public void setDelegate(IntentBatchDelegate delegate) {
this.delegate = checkNotNull(delegate, "Delegate cannot be null");
}
@Override
public void unsetDelegate(IntentBatchDelegate delegate) {
if (this.delegate != null && this.delegate.equals(delegate)) {
this.delegate = null;
}
}
}
......@@ -73,6 +73,8 @@ import org.onlab.onos.net.intent.ConnectivityIntent;
import org.onlab.onos.net.intent.HostToHostIntent;
import org.onlab.onos.net.intent.Intent;
import org.onlab.onos.net.intent.IntentId;
import org.onlab.onos.net.intent.IntentOperation;
import org.onlab.onos.net.intent.IntentOperations;
import org.onlab.onos.net.intent.IntentState;
import org.onlab.onos.net.intent.LinkCollectionIntent;
import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
......@@ -275,7 +277,9 @@ public final class KryoNamespaces {
WaypointConstraint.class,
ObstacleConstraint.class,
AnnotationConstraint.class,
BooleanConstraint.class
BooleanConstraint.class,
IntentOperation.class,
IntentOperations.class
)
.register(new DefaultApplicationIdSerializer(), DefaultApplicationId.class)
.register(new URISerializer(), URI.class)
......
......@@ -20,6 +20,7 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.intent.IntentBatchDelegate;
import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
......@@ -94,6 +95,11 @@ public class SimpleIntentBatchQueue implements IntentBatchService {
}
@Override
public boolean isLocalLeader(ApplicationId applicationId) {
return true;
}
@Override
public void setDelegate(IntentBatchDelegate delegate) {
this.delegate = checkNotNull(delegate, "Delegate cannot be null");
}
......