alshabib

initial working impl of batch operations

Change-Id: Ie970543dec1104a394c7bcfa6eec24c0538278d6
package org.onlab.onos.net.flow;
public class CompletedBatchOperation {
}
......@@ -2,12 +2,13 @@ package org.onlab.onos.net.flow;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.intent.BatchOperationTarget;
/**
* Represents a generalized match & action pair to be applied to
* an infrastucture device.
*/
public interface FlowRule {
public interface FlowRule extends BatchOperationTarget {
static final int MAX_TIMEOUT = 60;
static final int MIN_PRIORITY = 0;
......
package org.onlab.onos.net.flow;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.intent.BatchOperationEntry;
public class FlowRuleBatchEntry
extends BatchOperationEntry<FlowRuleOperation, FlowRule> {
public FlowRuleBatchEntry(FlowRuleOperation operator, FlowRule target) {
super(operator, target);
}
public enum FlowRuleOperation {
ADD,
REMOVE,
MODIFY
}
}
package org.onlab.onos.net.flow;
import java.util.Collection;
import org.onlab.onos.net.intent.BatchOperation;
public class FlowRuleBatchOperation
extends BatchOperation<FlowRuleBatchEntry> {
public FlowRuleBatchOperation(Collection<FlowRuleBatchEntry> operations) {
super(operations);
}
}
package org.onlab.onos.net.flow;
import java.util.concurrent.Future;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.Provider;
/**
......@@ -34,4 +37,6 @@ public interface FlowRuleProvider extends Provider {
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
......
package org.onlab.onos.net.flow;
import java.util.concurrent.Future;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
......@@ -66,7 +68,12 @@ public interface FlowRuleService {
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
//Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
/**
* Applies a batch operation of FlowRules.
*
* @return future indicating the state of the batch operation
*/
Future<CompletedBatchOperation> applyBatch(FlowRuleBatchOperation batch);
/**
* Adds the specified flow rule listener.
......
package org.onlab.onos.net.intent;
//TODO is this the right package?
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* A list of BatchOperationEntry.
*
......@@ -15,7 +16,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
private List<T> ops;
private final List<T> ops;
/**
* Creates new {@link BatchOperation} object.
......@@ -30,7 +31,7 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
*
* @param batchOperations the list of batch operation entries.
*/
public BatchOperation(List<T> batchOperations) {
public BatchOperation(Collection<T> batchOperations) {
ops = new LinkedList<>(checkNotNull(batchOperations));
}
......@@ -61,6 +62,10 @@ public abstract class BatchOperation<T extends BatchOperationEntry<?, ?>> {
/**
* Adds an operation.
* FIXME: Brian promises that the Intent Framework
* will not modify the batch operation after it has submitted it.
* Ali would prefer immutablity, but trusts brian for better or
* for worse.
*
* @param entry the operation to be added
* @return this object if succeeded, null otherwise
......
......@@ -15,14 +15,7 @@ public class BatchOperationEntry<T extends Enum<?>, U extends BatchOperationTarg
private final T operator;
private final U target;
/**
* Default constructor for serializer.
*/
@Deprecated
protected BatchOperationEntry() {
this.operator = null;
this.target = null;
}
/**
* Constructs new instance for the entry of the BatchOperation.
......
......@@ -5,6 +5,10 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -18,8 +22,11 @@ import org.onlab.onos.event.EventDeliveryService;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -32,7 +39,9 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
/**
* Provides implementation of the flow NB &amp; SB APIs.
......@@ -131,6 +140,38 @@ public class FlowRuleManager
}
@Override
public Future<CompletedBatchOperation> applyBatch(
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
ArrayListMultimap.create();
List<Future<Void>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
batches.put(frp, fbe);
switch (fbe.getOperator()) {
case ADD:
store.storeFlowRule(f);
break;
case REMOVE:
store.deleteFlowRule(f);
break;
case MODIFY:
default:
log.error("Batch operation type {} unsupported.", fbe.getOperator());
}
}
for (FlowRuleProvider provider : batches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
Future<Void> future = provider.executeBatch(b);
futures.add(future);
}
return new FlowRuleBatchFuture(futures);
}
@Override
public void addListener(FlowRuleListener listener) {
listenerRegistry.addListener(listener);
}
......@@ -296,4 +337,63 @@ public class FlowRuleManager
eventDispatcher.post(event);
}
}
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
private final List<Future<Void>> futures;
public FlowRuleBatchFuture(List<Future<Void>> futures) {
this.futures = futures;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isDone() {
boolean isDone = true;
for (Future<Void> future : futures) {
isDone &= future.isDone();
}
return isDone;
}
@Override
public CompletedBatchOperation get() throws InterruptedException,
ExecutionException {
// TODO Auto-generated method stub
for (Future<Void> future : futures) {
future.get();
}
return new CompletedBatchOperation();
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
// TODO we should decrement the timeout
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
for (Future<Void> future : futures) {
long now = System.nanoTime();
long thisTimeout = end - now;
future.get(thisTimeout, TimeUnit.NANOSECONDS);
}
return new CompletedBatchOperation();
}
}
}
......
......@@ -4,6 +4,8 @@ import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -16,6 +18,9 @@ import org.onlab.onos.net.Link;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.DefaultTrafficSelector;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleService;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
......@@ -24,6 +29,8 @@ import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.PathIntent;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Installer for {@link PathIntent path connectivity intents}.
*/
......@@ -56,19 +63,27 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
Link link = links.next();
TrafficTreatment treatment = builder()
.setOutput(link.src().port()).build();
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
flowRuleService.applyFlowRules(rule);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule));
//flowRuleService.applyFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
......@@ -77,6 +92,7 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
DefaultTrafficSelector.builder(intent.selector());
Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
List<FlowRuleBatchEntry> rules = Lists.newLinkedList();
while (links.hasNext()) {
builder.matchInport(prev.port());
......@@ -86,9 +102,16 @@ public class PathIntentInstaller implements IntentInstaller<PathIntent> {
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
flowRuleService.removeFlowRules(rule);
rules.add(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule));
//flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
FlowRuleBatchOperation batch = new FlowRuleBatchOperation(rules);
try {
flowRuleService.applyBatch(batch).get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
......
......@@ -12,6 +12,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
......@@ -32,6 +33,7 @@ import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
import org.onlab.onos.net.flow.FlowRuleProvider;
......@@ -42,6 +44,7 @@ import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.flow.criteria.Criterion;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
......@@ -404,6 +407,13 @@ public class FlowRuleManagerTest {
public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
}
@Override
public Future<Void> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
// TODO Auto-generated method stub
return null;
}
}
......
......@@ -68,7 +68,7 @@ public class FlowModBuilder {
this.cookie = flowRule.id();
}
public OFFlowMod buildFlowMod() {
public OFFlowMod buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
......@@ -86,6 +86,24 @@ public class FlowModBuilder {
}
public OFFlowMod buildFlowMod() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowModify()
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
.build();
return fm;
}
public OFFlowMod buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
......
......@@ -2,8 +2,17 @@ package org.onlab.onos.provider.of.flow.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -14,9 +23,11 @@ import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
import org.onlab.onos.net.intent.BatchOperation;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyService;
......@@ -27,6 +38,8 @@ import org.onlab.onos.openflow.controller.OpenFlowSwitch;
import org.onlab.onos.openflow.controller.OpenFlowSwitchListener;
import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
......@@ -42,9 +55,11 @@ import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
......@@ -70,6 +85,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final InternalFlowProvider listener = new InternalFlowProvider();
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
/**
* Creates an OpenFlow host provider.
*/
......@@ -101,7 +119,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void applyRule(FlowRule flowRule) {
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
}
......@@ -154,6 +172,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
......@@ -166,7 +185,17 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.satisfyRequirement(dpid);
}
break;
case ERROR:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
......@@ -226,6 +255,144 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = new HashSet<Dpid>();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sws.add(new Dpid(sw.getId()));
switch (fbe.getOperator()) {
case ADD:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
break;
case REMOVE:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
break;
case MODIFY:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
break;
default:
log.error("Unsupported batch operation {}", fbe.getOperator());
}
}
InstallationFuture installation = new InstallationFuture(sws);
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(batch.hashCode());
return installation;
}
private class InstallationFuture implements Future<Void> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final CountDownLatch countDownLatch;
public InstallationFuture(Set<Dpid> sws) {
this.sws = sws;
countDownLatch = new CountDownLatch(sws.size());
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
//TODO add reason to flowentry
//TODO handle specific error msgs
//offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch (msg.getErrType()) {
case BAD_ACTION:
break;
case BAD_INSTRUCTION:
break;
case BAD_MATCH:
break;
case BAD_REQUEST:
break;
case EXPERIMENTER:
break;
case FLOW_MOD_FAILED:
break;
case GROUP_MOD_FAILED:
break;
case HELLO_FAILED:
break;
case METER_MOD_FAILED:
break;
case PORT_MOD_FAILED:
break;
case QUEUE_OP_FAILED:
break;
case ROLE_REQUEST_FAILED:
break;
case SWITCH_CONFIG_FAILED:
break;
case TABLE_FEATURES_FAILED:
break;
case TABLE_MOD_FAILED:
break;
default:
break;
}
}
public void satisfyRequirement(Dpid dpid) {
log.warn("Satisfaction from switch {}", dpid);
sws.remove(controller.getSwitch(dpid));
countDownLatch.countDown();
}
public void verify(Integer id) {
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
.buildBarrierRequest()
.setXid(id);
sw.sendMsg(builder.build());
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isDone() {
return sws.isEmpty();
}
@Override
public Void get() throws InterruptedException, ExecutionException {
countDownLatch.await();
//return offendingFlowMods;
return null;
}
@Override
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
countDownLatch.await(timeout, unit);
//return offendingFlowMods;
return null;
}
}
}
......