alshabib

clean batch operations

Change-Id: I7187de40bb5276d6ae9e9831e5d47d36e16560ad
package org.onlab.onos.net.flow;
public class CompletedBatchOperation {
import java.util.List;
import com.google.common.collect.ImmutableList;
public class CompletedBatchOperation implements BatchOperationResult<FlowEntry> {
private final boolean success;
private final List<FlowEntry> failures;
public CompletedBatchOperation(boolean success, List<FlowEntry> failures) {
this.success = success;
this.failures = ImmutableList.copyOf(failures);
}
@Override
public boolean isSuccess() {
return success;
}
@Override
public List<FlowEntry> failedItems() {
return failures;
}
}
......
......@@ -17,6 +17,10 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
private long lastSeen = -1;
private final int errType;
private final int errCode;
public DefaultFlowEntry(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowEntryState state,
......@@ -27,6 +31,8 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this.life = life;
this.packets = packets;
this.bytes = bytes;
this.errCode = -1;
this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
......@@ -37,6 +43,8 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this.life = life;
this.packets = packets;
this.bytes = bytes;
this.errCode = -1;
this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
......@@ -46,9 +54,18 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
this.life = 0;
this.packets = 0;
this.bytes = 0;
this.errCode = -1;
this.errType = -1;
this.lastSeen = System.currentTimeMillis();
}
public DefaultFlowEntry(FlowRule rule, int errType, int errCode) {
super(rule);
this.state = FlowEntryState.FAILED;
this.errType = errType;
this.errCode = errCode;
}
@Override
public long life() {
return life;
......@@ -100,6 +117,16 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
}
@Override
public int errType() {
return this.errType;
}
@Override
public int errCode() {
return this.errCode;
}
@Override
public String toString() {
return toStringHelper(this)
.add("rule", super.toString())
......@@ -108,4 +135,6 @@ public class DefaultFlowEntry extends DefaultFlowRule implements FlowEntry {
}
}
......
......@@ -29,7 +29,12 @@ public interface FlowEntry extends FlowRule {
/**
* Flow has been removed from flow table and can be purged.
*/
REMOVED
REMOVED,
/**
* Indicates that the installation of this flow has failed.
*/
FAILED
}
/**
......@@ -95,4 +100,16 @@ public interface FlowEntry extends FlowRule {
*/
void setBytes(long bytes);
/**
* Indicates the error type.
* @return an integer value of the error
*/
int errType();
/**
* Indicates the error code.
* @return an integer value of the error
*/
int errCode();
}
......
......@@ -37,6 +37,12 @@ public interface FlowRuleProvider extends Provider {
*/
void removeRulesById(ApplicationId id, FlowRule... flowRules);
Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
/**
* Installs a batch of flow rules. Each flowrule is associated to an
* operation which results in either addition, removal or modification.
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
......
......@@ -5,10 +5,12 @@ import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -26,6 +28,7 @@ import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleBatchOperation;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleListener;
......@@ -52,6 +55,8 @@ public class FlowRuleManager
extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
implements FlowRuleService, FlowRuleProviderRegistry {
enum BatchState { STARTED, FINISHED, CANCELLED };
public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
private final Logger log = getLogger(getClass());
......@@ -144,7 +149,7 @@ public class FlowRuleManager
FlowRuleBatchOperation batch) {
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches =
ArrayListMultimap.create();
List<Future<Void>> futures = Lists.newArrayList();
List<Future<CompletedBatchOperation>> futures = Lists.newArrayList();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
final FlowRule f = fbe.getTarget();
final Device device = deviceService.getDevice(f.deviceId());
......@@ -165,10 +170,10 @@ public class FlowRuleManager
for (FlowRuleProvider provider : batches.keySet()) {
FlowRuleBatchOperation b =
new FlowRuleBatchOperation(batches.get(provider));
Future<Void> future = provider.executeBatch(b);
Future<CompletedBatchOperation> future = provider.executeBatch(b);
futures.add(future);
}
return new FlowRuleBatchFuture(futures);
return new FlowRuleBatchFuture(futures, batches);
}
@Override
......@@ -341,59 +346,140 @@ public class FlowRuleManager
private class FlowRuleBatchFuture
implements Future<CompletedBatchOperation> {
private final List<Future<Void>> futures;
private final List<Future<CompletedBatchOperation>> futures;
private final Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches;
private final AtomicReference<BatchState> state;
private CompletedBatchOperation overall;
public FlowRuleBatchFuture(List<Future<Void>> futures) {
public FlowRuleBatchFuture(List<Future<CompletedBatchOperation>> futures,
Multimap<FlowRuleProvider, FlowRuleBatchEntry> batches) {
this.futures = futures;
this.batches = batches;
state = new AtomicReference<FlowRuleManager.BatchState>();
state.set(BatchState.STARTED);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
if (state.get() == BatchState.FINISHED) {
return false;
}
if (!state.compareAndSet(BatchState.STARTED, BatchState.CANCELLED)) {
return false;
}
cleanUpBatch();
for (Future<CompletedBatchOperation> f : futures) {
f.cancel(mayInterruptIfRunning);
}
return true;
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
return state.get() == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
boolean isDone = true;
for (Future<Void> future : futures) {
isDone &= future.isDone();
}
return isDone;
return state.get() == BatchState.FINISHED;
}
@Override
public CompletedBatchOperation get() throws InterruptedException,
ExecutionException {
// TODO Auto-generated method stub
for (Future<Void> future : futures) {
future.get();
ExecutionException {
if (isDone()) {
return overall;
}
boolean success = true;
List<FlowEntry> failed = Lists.newLinkedList();
CompletedBatchOperation completed;
for (Future<CompletedBatchOperation> future : futures) {
completed = future.get();
success = validateBatchOperation(failed, completed, future);
}
return new CompletedBatchOperation();
return finalizeBatchOperation(success, failed);
}
@Override
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
// TODO we should decrement the timeout
if (isDone()) {
return overall;
}
boolean success = true;
List<FlowEntry> failed = Lists.newLinkedList();
CompletedBatchOperation completed;
long start = System.nanoTime();
long end = start + unit.toNanos(timeout);
for (Future<Void> future : futures) {
for (Future<CompletedBatchOperation> future : futures) {
long now = System.nanoTime();
long thisTimeout = end - now;
future.get(thisTimeout, TimeUnit.NANOSECONDS);
completed = future.get(thisTimeout, TimeUnit.NANOSECONDS);
success = validateBatchOperation(failed, completed, future);
}
return new CompletedBatchOperation();
return finalizeBatchOperation(success, failed);
}
private boolean validateBatchOperation(List<FlowEntry> failed,
CompletedBatchOperation completed,
Future<CompletedBatchOperation> future) {
if (isCancelled()) {
throw new CancellationException();
}
if (!completed.isSuccess()) {
failed.addAll(completed.failedItems());
cleanUpBatch();
cancelAllSubBatches();
return false;
}
return true;
}
private void cancelAllSubBatches() {
for (Future<CompletedBatchOperation> f : futures) {
f.cancel(true);
}
}
private CompletedBatchOperation finalizeBatchOperation(boolean success,
List<FlowEntry> failed) {
synchronized (overall) {
if (!state.compareAndSet(BatchState.STARTED, BatchState.FINISHED)) {
if (state.get() == BatchState.FINISHED) {
return overall;
}
throw new CancellationException();
}
overall = new CompletedBatchOperation(success, failed);
return overall;
}
}
private void cleanUpBatch() {
for (FlowRuleBatchEntry fbe : batches.values()) {
if (fbe.getOperator() == FlowRuleOperation.ADD ||
fbe.getOperator() == FlowRuleOperation.MODIFY) {
store.deleteFlowRule(fbe.getTarget());
} else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
store.storeFlowRule(fbe.getTarget());
}
}
}
}
}
......
......@@ -28,6 +28,7 @@ import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.DefaultFlowRule;
import org.onlab.onos.net.flow.FlowEntry;
......@@ -408,7 +409,7 @@ public class FlowRuleManagerTest {
}
@Override
public Future<Void> executeBatch(
public Future<CompletedBatchOperation> executeBatch(
BatchOperation<FlowRuleBatchEntry> batch) {
// TODO Auto-generated method stub
return null;
......
......@@ -27,6 +27,8 @@ import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModVlanPcp
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction;
import org.onlab.onos.net.flow.instructions.L3ModificationInstruction.ModIPInstruction;
import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFFlowAdd;
import org.projectfloodlight.openflow.protocol.OFFlowDelete;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowModFlags;
import org.projectfloodlight.openflow.protocol.action.OFAction;
......@@ -68,12 +70,13 @@ public class FlowModBuilder {
this.cookie = flowRule.id();
}
public OFFlowMod buildFlowAdd() {
public OFFlowAdd buildFlowAdd() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowAdd()
OFFlowAdd fm = factory.buildFlowAdd()
.setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -92,6 +95,7 @@ public class FlowModBuilder {
//TODO: what to do without bufferid? do we assume that there will be a pktout as well?
OFFlowMod fm = factory.buildFlowModify()
.setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......@@ -104,11 +108,12 @@ public class FlowModBuilder {
}
public OFFlowMod buildFlowDel() {
public OFFlowDelete buildFlowDel() {
Match match = buildMatch();
List<OFAction> actions = buildActions();
OFFlowMod fm = factory.buildFlowDelete()
OFFlowDelete fm = factory.buildFlowDelete()
.setXid(cookie.value())
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
......
......@@ -2,6 +2,7 @@ package org.onlab.onos.provider.of.flow.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
......@@ -21,9 +22,12 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.flow.CompletedBatchOperation;
import org.onlab.onos.net.flow.DefaultFlowEntry;
import org.onlab.onos.net.flow.FlowEntry;
import org.onlab.onos.net.flow.FlowRule;
import org.onlab.onos.net.flow.FlowRuleBatchEntry;
import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
import org.onlab.onos.net.flow.FlowRuleProvider;
import org.onlab.onos.net.flow.FlowRuleProviderRegistry;
import org.onlab.onos.net.flow.FlowRuleProviderService;
......@@ -40,6 +44,7 @@ import org.onlab.onos.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFActionType;
import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
import org.projectfloodlight.openflow.protocol.OFErrorMsg;
import org.projectfloodlight.openflow.protocol.OFFlowMod;
import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
......@@ -52,6 +57,11 @@ import org.projectfloodlight.openflow.protocol.OFStatsType;
import org.projectfloodlight.openflow.protocol.OFVersion;
import org.projectfloodlight.openflow.protocol.action.OFAction;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadActionErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadInstructionErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadMatchErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
......@@ -70,6 +80,8 @@ import com.google.common.collect.Multimap;
@Component(immediate = true)
public class OpenFlowRuleProvider extends AbstractProvider implements FlowRuleProvider {
enum BatchState { STARTED, FINISHED, CANCELLED };
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -88,6 +100,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final Map<Long, InstallationFuture> pendingFutures =
new ConcurrentHashMap<Long, InstallationFuture>();
private final Map<Long, InstallationFuture> pendingFMs =
new ConcurrentHashMap<Long, InstallationFuture>();
/**
* Creates an OpenFlow host provider.
*/
......@@ -143,9 +158,47 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
removeFlowRule(flowRules);
}
@Override
public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = new HashSet<Dpid>();
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
OFFlowMod mod = null;
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sws.add(new Dpid(sw.getId()));
FlowModBuilder builder = new FlowModBuilder(flowRule, sw.factory());
switch (fbe.getOperator()) {
case ADD:
mod = builder.buildFlowAdd();
break;
case REMOVE:
mod = builder.buildFlowDel();
break;
case MODIFY:
mod = builder.buildFlowMod();
break;
default:
log.error("Unsupported batch operation {}", fbe.getOperator());
}
if (mod != null) {
sw.sendMsg(mod);
fmXids.put(mod.getXid(), fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
}
InstallationFuture installation = new InstallationFuture(sws, fmXids);
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(batch.hashCode());
return installation;
}
//TODO: InternalFlowRuleProvider listening to stats and error and flowremoved.
// possibly barriers as well. May not be internal at all...
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
......@@ -175,7 +228,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
//TODO: make this better
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
......@@ -191,7 +243,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
break;
case ERROR:
future = pendingFutures.get(msg.getXid());
future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
......@@ -203,10 +255,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public void roleAssertFailed(Dpid dpid, RoleState role) {
// TODO Auto-generated method stub
}
public void roleAssertFailed(Dpid dpid, RoleState role) {}
private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
if (stats.getStatsType() != OFStatsType.FLOW) {
......@@ -230,7 +279,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
private boolean tableMissRule(Dpid dpid, OFFlowStatsEntry reply) {
// TODO NEED TO FIND A BETTER WAY TO AVOID DOING THIS
if (reply.getVersion().equals(OFVersion.OF_10) ||
reply.getMatch().getMatchFields().iterator().hasNext()) {
return false;
......@@ -251,104 +299,91 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
return false;
}
}
@Override
public Future<Void> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws = new HashSet<Dpid>();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
sws.add(new Dpid(sw.getId()));
switch (fbe.getOperator()) {
case ADD:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowAdd());
break;
case REMOVE:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowDel());
break;
case MODIFY:
//TODO: Track XID for each flowmod
sw.sendMsg(new FlowModBuilder(flowRule, sw.factory()).buildFlowMod());
break;
default:
log.error("Unsupported batch operation {}", fbe.getOperator());
}
}
InstallationFuture installation = new InstallationFuture(sws);
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(batch.hashCode());
return installation;
}
private class InstallationFuture implements Future<Void> {
private class InstallationFuture implements Future<CompletedBatchOperation> {
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
private final List<FlowEntry> offendingFlowMods = Lists.newLinkedList();
private final CountDownLatch countDownLatch;
private Integer pendingXid;
private BatchState state;
public InstallationFuture(Set<Dpid> sws) {
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.state = BatchState.STARTED;
this.sws = sws;
this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
//TODO add reason to flowentry
FlowEntry fe = null;
FlowRuleBatchEntry fbe = fms.get(msg.getXid());
FlowRule offending = fbe.getTarget();
//TODO handle specific error msgs
//offendingFlowMods.add(new FlowEntryBuilder(dpid, msg.));
switch (msg.getErrType()) {
case BAD_ACTION:
OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
badins.getCode().ordinal());
break;
case BAD_MATCH:
OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
break;
case EXPERIMENTER:
OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
fmFail.getCode().ordinal());
break;
case EXPERIMENTER:
case GROUP_MOD_FAILED:
break;
case HELLO_FAILED:
break;
case METER_MOD_FAILED:
break;
case PORT_MOD_FAILED:
break;
case QUEUE_OP_FAILED:
break;
case ROLE_REQUEST_FAILED:
break;
case SWITCH_CONFIG_FAILED:
break;
case TABLE_FEATURES_FAILED:
break;
case TABLE_MOD_FAILED:
fe = new DefaultFlowEntry(offending, msg.getErrType().ordinal(), 0);
break;
default:
break;
log.error("Unknown error type {}", msg.getErrType());
}
offendingFlowMods.add(fe);
}
public void satisfyRequirement(Dpid dpid) {
log.warn("Satisfaction from switch {}", dpid);
sws.remove(controller.getSwitch(dpid));
sws.remove(dpid);
countDownLatch.countDown();
cleanUp();
}
public void verify(Integer id) {
pendingXid = id;
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
......@@ -356,41 +391,57 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
.setXid(id);
sw.sendMsg(builder.build());
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return false;
this.state = BatchState.CANCELLED;
cleanUp();
for (FlowRuleBatchEntry fbe : fms.values()) {
if (fbe.getOperator() == FlowRuleOperation.ADD ||
fbe.getOperator() == FlowRuleOperation.MODIFY) {
removeFlowRule(fbe.getTarget());
} else if (fbe.getOperator() == FlowRuleOperation.REMOVE) {
applyRule(fbe.getTarget());
}
}
return isCancelled();
}
@Override
public boolean isCancelled() {
// TODO Auto-generated method stub
return false;
return this.state == BatchState.CANCELLED;
}
@Override
public boolean isDone() {
return sws.isEmpty();
return this.state == BatchState.FINISHED;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
public CompletedBatchOperation get() throws InterruptedException, ExecutionException {
countDownLatch.await();
//return offendingFlowMods;
return null;
this.state = BatchState.FINISHED;
return new CompletedBatchOperation(ok.get(), offendingFlowMods);
}
@Override
public Void get(long timeout, TimeUnit unit)
public CompletedBatchOperation get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
countDownLatch.await(timeout, unit);
//return offendingFlowMods;
return null;
this.state = BatchState.FINISHED;
return new CompletedBatchOperation(ok.get(), offendingFlowMods);
}
private void cleanUp() {
if (sws.isEmpty()) {
pendingFutures.remove(pendingXid);
for (Long xid : fms.keySet()) {
pendingFMs.remove(xid);
}
}
}
}
......