Pier Ventre

Add support for vlan based intents in the Corsa driver

Changes:
- Improves processSpecific in AbstractCorsaPipeline in order to support
Intents without an explicit match on the Ethertype;
- Implements vlan based circuits in CorsaPipelineV3 through the management
of the FwdObjective without Treatment;
- Distinguish Groups from simple actions;
- Corsa group are identified using the actions of the treatment;
- handling of the pending next similar to DefaultSingleTablePipeline

Change-Id: Iff0f70d56c64193524c6640f31ffb3f5629499dc
......@@ -93,14 +93,14 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
private ServiceDirectory serviceDirectory;
protected FlowRuleService flowRuleService;
private CoreService coreService;
private GroupService groupService;
protected GroupService groupService;
protected MeterService meterService;
private FlowObjectiveStore flowObjectiveStore;
protected FlowObjectiveStore flowObjectiveStore;
protected DeviceId deviceId;
protected ApplicationId appId;
protected DeviceService deviceService;
private KryoNamespace appKryo = new KryoNamespace.Builder()
protected KryoNamespace appKryo = new KryoNamespace.Builder()
.register(GroupKey.class)
.register(DefaultGroupKey.class)
.register(CorsaGroup.class)
......@@ -108,6 +108,8 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
.build("AbstractCorsaPipeline");
private Cache<GroupKey, NextObjective> pendingGroups;
protected Cache<Integer, NextObjective> pendingNext;
private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
......@@ -131,6 +133,16 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
}
}).build();
pendingNext = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
.removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
if (notification.getCause() == RemovalCause.EXPIRED) {
notification.getValue().context()
.ifPresent(c -> c.onError(notification.getValue(),
ObjectiveError.FLOWINSTALLATIONFAILED));
}
}).build();
groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
coreService = serviceDirectory.get(CoreService.class);
......@@ -304,6 +316,7 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
@Override
public void forward(ForwardingObjective fwd) {
Collection<FlowRule> rules;
FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
......@@ -354,16 +367,20 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
log.debug("Processing specific forwarding objective");
TrafficSelector selector = fwd.selector();
EthTypeCriterion ethType =
EthTypeCriterion ethTypeCriterion =
(EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
if (ethType != null) {
short et = ethType.ethType().toShort();
VlanIdCriterion vlanIdCriterion =
(VlanIdCriterion) selector.getCriterion(Criterion.Type.VLAN_VID);
if (ethTypeCriterion != null) {
short et = ethTypeCriterion.ethType().toShort();
if (et == Ethernet.TYPE_IPV4) {
return processSpecificRoute(fwd);
} else if (et == Ethernet.TYPE_VLAN) {
/* The ForwardingObjective must specify VLAN ethtype in order to use the Transit Circuit */
return processSpecificSwitch(fwd);
}
} else if (vlanIdCriterion != null) {
return processSpecificSwitch(fwd);
}
fail(fwd, ObjectiveError.UNSUPPORTED);
......@@ -464,6 +481,41 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
//Hook for modifying Route flow rule
protected abstract Builder processSpecificRoutingRule(Builder rb);
protected enum CorsaTrafficTreatmentType {
/**
* If the treatment has to be handled as group.
*/
GROUP,
/**
* If the treatment has to be handled as simple set of actions.
*/
ACTIONS
}
/**
* Helper class to encapsulate both traffic treatment and
* type of treatment.
*/
protected class CorsaTrafficTreatment {
private CorsaTrafficTreatmentType type;
private TrafficTreatment trafficTreatment;
public CorsaTrafficTreatment(CorsaTrafficTreatmentType treatmentType, TrafficTreatment trafficTreatment) {
this.type = treatmentType;
this.trafficTreatment = trafficTreatment;
}
public CorsaTrafficTreatmentType type() {
return type;
}
public TrafficTreatment treatment() {
return trafficTreatment;
}
}
@Override
public void next(NextObjective nextObjective) {
switch (nextObjective.type()) {
......@@ -471,20 +523,25 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
Collection<TrafficTreatment> treatments = nextObjective.next();
if (treatments.size() == 1) {
TrafficTreatment treatment = treatments.iterator().next();
treatment = processNextTreatment(treatment);
GroupBucket bucket =
DefaultGroupBucket.createIndirectGroupBucket(treatment);
CorsaTrafficTreatment corsaTreatment = processNextTreatment(treatment);
final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
GroupDescription groupDescription
= new DefaultGroupDescription(deviceId,
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections
.singletonList(bucket)),
key,
null, // let group service determine group id
nextObjective.appId());
groupService.addGroup(groupDescription);
pendingGroups.put(key, nextObjective);
if (corsaTreatment.type() == CorsaTrafficTreatmentType.GROUP) {
GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(corsaTreatment.treatment());
GroupBuckets buckets = new GroupBuckets(Collections.singletonList(bucket));
// group id == null, let group service determine group id
GroupDescription groupDescription = new DefaultGroupDescription(deviceId,
GroupDescription.Type.INDIRECT,
buckets,
key,
null,
nextObjective.appId());
groupService.addGroup(groupDescription);
pendingGroups.put(key, nextObjective);
} else if (corsaTreatment.type() == CorsaTrafficTreatmentType.ACTIONS) {
pendingNext.put(nextObjective.id(), nextObjective);
flowObjectiveStore.putNextGroup(nextObjective.id(), new CorsaGroup(key));
nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
}
}
break;
case HASHED:
......@@ -501,8 +558,8 @@ public abstract class AbstractCorsaPipeline extends AbstractHandlerBehaviour imp
}
//Hook for altering the NextObjective treatment
protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
return treatment;
protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
return new CorsaTrafficTreatment(CorsaTrafficTreatmentType.GROUP, treatment);
}
//Init helper: Table Miss = Drop
......
......@@ -30,9 +30,12 @@ import org.onosproject.net.flow.criteria.EthCriterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.PortCriterion;
import org.onosproject.net.flow.criteria.VlanIdCriterion;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeterRequest;
......@@ -69,9 +72,11 @@ public class CorsaPipelineV3 extends AbstractCorsaPipeline {
protected MeterId defaultMeterId = null;
@Override
protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
treatment.immediate().stream()
.filter(i -> {
switch (i.type()) {
......@@ -87,7 +92,48 @@ public class CorsaPipelineV3 extends AbstractCorsaPipeline {
return false;
}
}).forEach(i -> tb.add(i));
return tb.build();
TrafficTreatment t = tb.build();
boolean isPresentModVlanId = false;
boolean isPresentModEthSrc = false;
boolean isPresentModEthDst = false;
boolean isPresentOutpuPort = false;
for (Instruction instruction : t.immediate()) {
switch (instruction.type()) {
case L2MODIFICATION:
L2ModificationInstruction l2i = (L2ModificationInstruction) instruction;
if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) {
isPresentModVlanId = true;
}
if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) {
L2ModificationInstruction.L2SubType subType = l2i.subtype();
if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) {
isPresentModEthSrc = true;
} else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) {
isPresentModEthDst = true;
}
}
case OUTPUT:
isPresentOutpuPort = true;
default:
}
}
CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS;
/**
* This represents the allowed group for CorsaPipelinev3
*/
if (isPresentModVlanId &&
isPresentModEthSrc &&
isPresentModEthDst &&
isPresentOutpuPort) {
type = CorsaTrafficTreatmentType.GROUP;
}
CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t);
return corsaTreatment;
}
@Override
......@@ -115,9 +161,37 @@ public class CorsaPipelineV3 extends AbstractCorsaPipeline {
.withPriority(fwd.priority())
.forDevice(deviceId)
.withSelector(filteredSelector)
.withTreatment(fwd.treatment())
.forTable(VLAN_CIRCUIT_TABLE);
if (fwd.treatment() != null) {
ruleBuilder.withTreatment(fwd.treatment());
} else {
if (fwd.nextId() != null) {
NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
if (nextObjective != null) {
pendingNext.invalidate(fwd.nextId());
TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder()
.setVlanPcp((byte) 0)
.setQueue(0)
.meter(defaultMeterId);
nextObjective.next().forEach(trafficTreatment -> {
trafficTreatment.allInstructions().forEach(instruction -> {
treatment.add(instruction);
});
});
ruleBuilder.withTreatment(treatment.build());
} else {
log.warn("The group left!");
fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
return ImmutableSet.of();
}
} else {
log.warn("Missing NextObjective ID for ForwardingObjective {}", fwd.id());
fail(fwd, ObjectiveError.BADPARAMS);
return ImmutableSet.of();
}
}
if (fwd.permanent()) {
ruleBuilder.makePermanent();
} else {
......
......@@ -29,6 +29,7 @@ import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.ForwardingObjective;
......@@ -226,9 +227,8 @@ public class CorsaPipelineV39 extends CorsaPipelineV3 {
}
@Override
protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
tb.add(Instructions.popVlan());
treatment.immediate().stream()
.filter(i -> {
switch (i.type()) {
......@@ -236,7 +236,6 @@ public class CorsaPipelineV39 extends CorsaPipelineV3 {
L2ModificationInstruction l2i = (L2ModificationInstruction) i;
if (l2i.subtype() == VLAN_ID ||
l2i.subtype() == VLAN_POP ||
l2i.subtype() == VLAN_POP ||
l2i.subtype() == ETH_DST ||
l2i.subtype() == ETH_SRC) {
return true;
......@@ -247,6 +246,51 @@ public class CorsaPipelineV39 extends CorsaPipelineV3 {
return false;
}
}).forEach(i -> tb.add(i));
return tb.build();
TrafficTreatment t = tb.build();
boolean isPresentModVlanId = false;
boolean isPresentModEthSrc = false;
boolean isPresentModEthDst = false;
boolean isPresentOutpuPort = false;
for (Instruction instruction : t.immediate()) {
switch (instruction.type()) {
case L2MODIFICATION:
L2ModificationInstruction l2i = (L2ModificationInstruction) instruction;
if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) {
isPresentModVlanId = true;
}
if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) {
L2ModificationInstruction.L2SubType subType = l2i.subtype();
if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) {
isPresentModEthSrc = true;
} else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) {
isPresentModEthDst = true;
}
}
case OUTPUT:
isPresentOutpuPort = true;
default:
}
}
CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS;
/**
* These are the allowed groups for CorsaPipelinev39
*/
if (isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) {
type = CorsaTrafficTreatmentType.GROUP;
} else if ((!isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) ||
(!isPresentModVlanId && !isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) ||
(!isPresentModVlanId && !isPresentModEthSrc && !isPresentModEthDst && isPresentOutpuPort)) {
type = CorsaTrafficTreatmentType.GROUP;
TrafficTreatment.Builder tb2 = DefaultTrafficTreatment.builder(t);
tb2.add(Instructions.popVlan());
t = tb2.build();
}
CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t);
return corsaTreatment;
}
}
......