Brian O'Connor
Committed by Gerrit Code Review

Serializing batch execution on per-instance basis for intents

Change-Id: Idda3f4a65e78567302d91ba0070e78d435eea8fd
......@@ -155,6 +155,9 @@ public class IntentManager
@Override
public void execute(IntentOperations operations) {
if (operations.operations().isEmpty()) {
return;
}
batchService.addIntentOperations(operations);
}
......
......@@ -15,7 +15,7 @@
*/
package org.onlab.onos.store.intent.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -25,7 +25,9 @@ import org.onlab.onos.net.intent.IntentBatchService;
import org.onlab.onos.net.intent.IntentOperations;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -37,7 +39,8 @@ import static org.slf4j.LoggerFactory.getLogger;
public class DistributedIntentBatchQueue implements IntentBatchService {
private final Logger log = getLogger(getClass());
private final Set<IntentOperations> pendingBatches = new HashSet<>();
private final Queue<IntentOperations> pendingBatches = new LinkedList<>();
private final Set<IntentOperations> currentBatches = Sets.newHashSet();
private IntentBatchDelegate delegate;
@Activate
......@@ -53,18 +56,35 @@ public class DistributedIntentBatchQueue implements IntentBatchService {
@Override
public void addIntentOperations(IntentOperations operations) {
checkState(delegate != null, "No delegate set");
pendingBatches.add(operations);
delegate.execute(operations);
synchronized (this) {
pendingBatches.add(operations);
if (currentBatches.isEmpty()) {
IntentOperations work = pendingBatches.poll();
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public void removeIntentOperations(IntentOperations operations) {
pendingBatches.remove(operations);
// we allow at most one outstanding batch at a time
synchronized (this) {
checkState(currentBatches.remove(operations), "Operations not found in current ops.");
checkState(currentBatches.isEmpty(), "More than one outstanding batch.");
IntentOperations work = pendingBatches.poll();
if (work != null) {
currentBatches.add(work);
delegate.execute(work);
}
}
}
@Override
public Set<IntentOperations> getIntentOperations() {
return ImmutableSet.copyOf(pendingBatches);
Set<IntentOperations> set = Sets.newHashSet(currentBatches);
set.addAll((Collection) pendingBatches);
return set;
}
@Override
......