Thomas Vachuska
Committed by Gerrit Code Review

Merge "Fixing flow rule batches"

......@@ -63,7 +63,7 @@ public final class DefaultTrafficSelector implements TrafficSelector {
@Override
public int hashCode() {
return Objects.hash(criteria);
return criteria.hashCode();
}
@Override
......
......@@ -18,7 +18,7 @@ package org.onlab.onos.net.flow;
import org.onlab.onos.core.ApplicationId;
import org.onlab.onos.net.provider.Provider;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.Future;
/**
* Abstraction of a flow rule provider.
......@@ -58,6 +58,6 @@ public interface FlowRuleProvider extends Provider {
* @param batch a batch of flow rules
* @return a future indicating the status of this execution
*/
ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch);
}
......
......@@ -196,7 +196,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(port, type());
return Objects.hash(type(), port);
}
@Override
......@@ -242,7 +242,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(mac, type);
return Objects.hash(type, mac);
}
@Override
......@@ -288,7 +288,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(ethType, type());
return Objects.hash(type(), ethType);
}
@Override
......@@ -336,7 +336,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(ip, type);
return Objects.hash(type, ip);
}
@Override
......@@ -382,7 +382,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(proto, type());
return Objects.hash(type(), proto);
}
@Override
......@@ -427,7 +427,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(vlanPcp);
return Objects.hash(type(), vlanPcp);
}
@Override
......@@ -474,7 +474,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(vlanId, type());
return Objects.hash(type(), vlanId);
}
@Override
......@@ -522,7 +522,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(tcpPort, type);
return Objects.hash(type, tcpPort);
}
@Override
......@@ -568,7 +568,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(lambda, type);
return Objects.hash(type, lambda);
}
@Override
......@@ -612,7 +612,7 @@ public final class Criteria {
@Override
public int hashCode() {
return Objects.hash(signalType, type);
return Objects.hash(type, signalType);
}
@Override
......
......@@ -190,7 +190,7 @@ public final class Instructions {
@Override
public int hashCode() {
return Objects.hash(port, type());
return Objects.hash(type(), port);
}
@Override
......
......@@ -70,7 +70,7 @@ public abstract class L0ModificationInstruction implements Instruction {
@Override
public int hashCode() {
return Objects.hash(lambda, type(), subtype);
return Objects.hash(type(), subtype, lambda);
}
@Override
......
......@@ -93,7 +93,7 @@ public abstract class L2ModificationInstruction implements Instruction {
@Override
public int hashCode() {
return Objects.hash(mac, type(), subtype);
return Objects.hash(type(), subtype, mac);
}
@Override
......@@ -142,7 +142,7 @@ public abstract class L2ModificationInstruction implements Instruction {
@Override
public int hashCode() {
return Objects.hash(vlanId, type(), subtype());
return Objects.hash(type(), subtype(), vlanId);
}
@Override
......@@ -191,7 +191,7 @@ public abstract class L2ModificationInstruction implements Instruction {
@Override
public int hashCode() {
return Objects.hash(vlanPcp, type(), subtype());
return Objects.hash(type(), subtype(), vlanPcp);
}
@Override
......
......@@ -85,7 +85,7 @@ public abstract class L3ModificationInstruction implements Instruction {
@Override
public int hashCode() {
return Objects.hash(ip, type(), subtype());
return Objects.hash(type(), subtype(), ip);
}
@Override
......
......@@ -15,22 +15,12 @@
*/
package org.onlab.onos.net.flow.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.util.Tools.namedThreads;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -64,14 +54,21 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provides implementation of the flow NB &amp; SB APIs.
......@@ -92,8 +89,7 @@ public class FlowRuleManager
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
private final ExecutorService futureListeners =
Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
private ExecutorService futureService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleStore store;
......@@ -106,6 +102,7 @@ public class FlowRuleManager
@Activate
public void activate() {
futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners"));
store.setDelegate(delegate);
eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
log.info("Started");
......@@ -113,7 +110,7 @@ public class FlowRuleManager
@Deactivate
public void deactivate() {
futureListeners.shutdownNow();
futureService.shutdownNow();
store.unsetDelegate(delegate);
eventDispatcher.removeSink(FlowRuleEvent.class);
......@@ -364,6 +361,9 @@ public class FlowRuleManager
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements FlowRuleStoreDelegate {
private static final int TIMEOUT = 5000; // ms
// TODO: Right now we only dispatch events at individual flowEntry level.
// It may be more efficient for also dispatch events as a batch.
@Override
......@@ -384,15 +384,21 @@ public class FlowRuleManager
FlowRuleProvider flowRuleProvider =
getProvider(batchOperation.getOperations().get(0).getTarget().deviceId());
final ListenableFuture<CompletedBatchOperation> result =
final Future<CompletedBatchOperation> result =
flowRuleProvider.executeBatch(batchOperation);
result.addListener(new Runnable() {
futureService.submit(new Runnable() {
@Override
public void run() {
store.batchOperationComplete(FlowRuleBatchEvent.completed(request,
Futures.getUnchecked(result)));
CompletedBatchOperation res = null;
try {
res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.warn("Something went wrong with the batch operation {}",
request.batchId());
}
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
}
}, futureListeners);
});
break;
case BATCH_OPERATION_COMPLETED:
......
......@@ -15,21 +15,11 @@
*/
package org.onlab.onos.provider.of.flow.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -80,15 +70,23 @@ import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.projectfloodlight.openflow.protocol.instruction.OFInstructionApplyActions;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.U32;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ExecutionList;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provider which uses an OpenFlow controller to detect network
......@@ -124,6 +122,8 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
private final AtomicLong xidCounter = new AtomicLong(0);
/**
* Creates an OpenFlow host provider.
*/
......@@ -154,6 +154,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
log.info("Stopped");
}
@Override
public void applyFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
......@@ -167,7 +168,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public void removeFlowRule(FlowRule... flowRules) {
for (int i = 0; i < flowRules.length; i++) {
......@@ -188,11 +188,15 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
@Override
public ListenableFuture<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
public Future<CompletedBatchOperation> executeBatch(BatchOperation<FlowRuleBatchEntry> batch) {
final Set<Dpid> sws =
Collections.newSetFromMap(new ConcurrentHashMap<Dpid, Boolean>());
final Map<Long, FlowRuleBatchEntry> fmXids = new HashMap<Long, FlowRuleBatchEntry>();
OFFlowMod mod = null;
/*
* Use identity hash map for reference equality as we could have equal
* flow mods for different switches.
*/
Map<OFFlowMod, OpenFlowSwitch> mods = Maps.newIdentityHashMap();
for (FlowRuleBatchEntry fbe : batch.getOperations()) {
FlowRule flowRule = fbe.getTarget();
OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId().uri()));
......@@ -208,6 +212,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
sws.add(new Dpid(sw.getId()));
FlowModBuilder builder = FlowModBuilder.builder(flowRule, sw.factory());
OFFlowMod mod = null;
switch (fbe.getOperator()) {
case ADD:
mod = builder.buildFlowAdd();
......@@ -222,25 +227,29 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
log.error("Unsupported batch operation {}", fbe.getOperator());
}
if (mod != null) {
sw.sendMsg(mod);
fmXids.put(mod.getXid(), fbe);
mods.put(mod, sw);
fmXids.put(xidCounter.getAndIncrement(), fbe);
} else {
log.error("Conversion of flowrule {} failed.", flowRule);
}
}
InstallationFuture installation = new InstallationFuture(sws, fmXids);
for (Long xid : fmXids.keySet()) {
pendingFMs.put(xid, installation);
}
pendingFutures.put(U32.f(batch.hashCode()), installation);
installation.verify(U32.f(batch.hashCode()));
pendingFutures.put(installation.xid(), installation);
for (Map.Entry<OFFlowMod, OpenFlowSwitch> entry : mods.entrySet()) {
OpenFlowSwitch sw = entry.getValue();
OFFlowMod mod = entry.getKey();
sw.sendMsg(mod);
}
installation.verify();
return installation;
}
private class InternalFlowProvider
implements OpenFlowSwitchListener, OpenFlowEventListener {
implements OpenFlowSwitchListener, OpenFlowEventListener {
private final Multimap<DeviceId, FlowEntry> completeEntries =
......@@ -274,36 +283,36 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
public void handleMessage(Dpid dpid, OFMessage msg) {
InstallationFuture future = null;
switch (msg.getType()) {
case FLOW_REMOVED:
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
providerService.flowRemoved(fr);
break;
case STATS_REPLY:
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.satisfyRequirement(dpid);
}
break;
case ERROR:
future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
break;
default:
log.debug("Unhandled message type: {}", msg.getType());
case FLOW_REMOVED:
OFFlowRemoved removed = (OFFlowRemoved) msg;
FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
providerService.flowRemoved(fr);
break;
case STATS_REPLY:
pushFlowMetrics(dpid, (OFStatsReply) msg);
break;
case BARRIER_REPLY:
future = pendingFutures.get(msg.getXid());
if (future != null) {
future.satisfyRequirement(dpid);
}
break;
case ERROR:
future = pendingFMs.get(msg.getXid());
if (future != null) {
future.fail((OFErrorMsg) msg, dpid);
}
break;
default:
log.debug("Unhandled message type: {}", msg.getType());
}
}
@Override
public void receivedRoleReply(Dpid dpid, RoleState requested,
RoleState response) {
RoleState response) {
// Do nothing here for now.
}
......@@ -352,8 +361,9 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
private class InstallationFuture implements ListenableFuture<CompletedBatchOperation> {
private class InstallationFuture implements Future<CompletedBatchOperation> {
private final Long xid;
private final Set<Dpid> sws;
private final AtomicBoolean ok = new AtomicBoolean(true);
private final Map<Long, FlowRuleBatchEntry> fms;
......@@ -361,18 +371,22 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private final Set<FlowEntry> offendingFlowMods = Sets.newHashSet();
private final CountDownLatch countDownLatch;
private Long pendingXid;
private BatchState state;
private final ExecutionList executionList = new ExecutionList();
public InstallationFuture(Set<Dpid> sws, Map<Long, FlowRuleBatchEntry> fmXids) {
this.xid = xidCounter.getAndIncrement();
this.state = BatchState.STARTED;
this.sws = sws;
this.fms = fmXids;
countDownLatch = new CountDownLatch(sws.size());
}
public Long xid() {
return xid;
}
public void fail(OFErrorMsg msg, Dpid dpid) {
ok.set(false);
......@@ -385,27 +399,27 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
case BAD_ACTION:
OFBadActionErrorMsg bad = (OFBadActionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, bad.getErrType().ordinal(),
bad.getCode().ordinal());
bad.getCode().ordinal());
break;
case BAD_INSTRUCTION:
OFBadInstructionErrorMsg badins = (OFBadInstructionErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badins.getErrType().ordinal(),
badins.getCode().ordinal());
badins.getCode().ordinal());
break;
case BAD_MATCH:
OFBadMatchErrorMsg badMatch = (OFBadMatchErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badMatch.getErrType().ordinal(),
badMatch.getCode().ordinal());
badMatch.getCode().ordinal());
break;
case BAD_REQUEST:
OFBadRequestErrorMsg badReq = (OFBadRequestErrorMsg) msg;
fe = new DefaultFlowEntry(offending, badReq.getErrType().ordinal(),
badReq.getCode().ordinal());
badReq.getCode().ordinal());
break;
case FLOW_MOD_FAILED:
OFFlowModFailedErrorMsg fmFail = (OFFlowModFailedErrorMsg) msg;
fe = new DefaultFlowEntry(offending, fmFail.getErrType().ordinal(),
fmFail.getCode().ordinal());
fmFail.getCode().ordinal());
break;
case EXPERIMENTER:
case GROUP_MOD_FAILED:
......@@ -434,13 +448,12 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
public void verify(Long id) {
pendingXid = id;
public void verify() {
for (Dpid dpid : sws) {
OpenFlowSwitch sw = controller.getSwitch(dpid);
OFBarrierRequest.Builder builder = sw.factory()
.buildBarrierRequest()
.setXid(id);
.setXid(xid);
sw.sendMsg(builder.build());
}
}
......@@ -462,7 +475,6 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
}
}
invokeCallbacks();
return true;
}
......@@ -481,6 +493,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
countDownLatch.await();
this.state = BatchState.FINISHED;
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
//FIXME do cleanup here
return result;
}
......@@ -491,6 +504,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
if (countDownLatch.await(timeout, unit)) {
this.state = BatchState.FINISHED;
CompletedBatchOperation result = new CompletedBatchOperation(ok.get(), offendingFlowMods);
// FIXME do cleanup here
return result;
}
throw new TimeoutException();
......@@ -498,9 +512,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void cleanUp() {
if (isDone() || isCancelled()) {
if (pendingXid != null) {
pendingFutures.remove(pendingXid);
}
pendingFutures.remove(xid);
for (Long xid : fms.keySet()) {
pendingFMs.remove(xid);
}
......@@ -509,21 +521,10 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
private void removeRequirement(Dpid dpid) {
countDownLatch.countDown();
if (countDownLatch.getCount() == 0) {
invokeCallbacks();
}
sws.remove(dpid);
//FIXME don't do cleanup here
cleanUp();
}
@Override
public void addListener(Runnable runnable, Executor executor) {
executionList.add(runnable, executor);
}
private void invokeCallbacks() {
executionList.execute();
}
}
}
......