Saurav Das
Committed by Ray Milkey

CORD-48 Added support for broadcast next objective in OFDPA driver.

Changed groupid to show in hex for cli command 'groups'

Change-Id: I86474912a9fd775c36d5bc49545eaa58ecc46b47
......@@ -577,6 +577,7 @@ public class DefaultGroupHandler {
ports.forEach(port -> {
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
tBuilder.popVlan();
tBuilder.setOutput(port);
nextObjBuilder.addTreatment(tBuilder.build());
});
......
......@@ -121,11 +121,11 @@ public class GroupsListCommand extends AbstractShellCommand {
private void printGroups(DeviceId deviceId, List<Group> groups) {
print("deviceId=%s", deviceId);
for (Group group : groups) {
print(FORMAT, group.id().id(), group.state(), group.type(),
print(FORMAT, Integer.toHexString(group.id().id()), group.state(), group.type(),
group.bytes(), group.packets(), group.appId().name());
int i = 0;
for (GroupBucket bucket:group.buckets().buckets()) {
print(BUCKET_FORMAT, group.id().id(), ++i,
print(BUCKET_FORMAT, Integer.toHexString(group.id().id()), ++i,
bucket.bytes(), bucket.packets(),
bucket.treatment().allInstructions());
}
......
......@@ -630,7 +630,8 @@ public final class Instructions {
@Override
public String toString() {
return toStringHelper(type().toString())
.add("group ID", groupId.id()).toString();
.addValue("group ID=0x" + Integer.toHexString(groupId.id()))
.toString();
}
@Override
......
......@@ -18,7 +18,10 @@ package org.onosproject.driver.pipeline;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.onlab.packet.VlanId;
import org.onosproject.core.ApplicationId;
......@@ -54,11 +57,16 @@ public class CpqdOFDPA2Pipeline extends OFDPA2Pipeline {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
selector.matchVlanId(vidCriterion.vlanId());
treatment.transition(TMAC_TABLE);
VlanId storeVlan = null;
if (vidCriterion.vlanId() == VlanId.NONE) {
// untagged packets are assigned vlans
treatment.pushVlan().setVlanId(assignedVlan);
storeVlan = assignedVlan;
} else {
storeVlan = vidCriterion.vlanId();
}
treatment.transition(TMAC_TABLE);
// ofdpa cannot match on ALL portnumber, so we need to use separate
// rules for each port.
......@@ -72,7 +80,20 @@ public class CpqdOFDPA2Pipeline extends OFDPA2Pipeline {
} else {
portnums.add(portCriterion.port());
}
for (PortNumber pnum : portnums) {
// update storage
port2Vlan.put(pnum, storeVlan);
Set<PortNumber> vlanPorts = vlan2Port.get(storeVlan);
if (vlanPorts == null) {
vlanPorts = Collections.newSetFromMap(
new ConcurrentHashMap<PortNumber, Boolean>());
vlanPorts.add(pnum);
vlan2Port.put(storeVlan, vlanPorts);
} else {
vlanPorts.add(pnum);
}
// create rest of flowrule
selector.matchInPort(pnum);
FlowRule rule = DefaultFlowRule.builder()
.forDevice(deviceId)
......
......@@ -23,11 +23,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.onlab.osgi.ServiceDirectory;
......@@ -147,6 +149,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
private static final int L3UNICASTMASK = 0x20000000;
//private static final int MPLSINTERFACEMASK = 0x90000000;
private static final int L3ECMPMASK = 0x70000000;
private static final int L2FLOODMASK = 0x40000000;
private final Logger log = getLogger(getClass());
private ServiceDirectory serviceDirectory;
......@@ -176,6 +179,13 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
private Set<IPCriterion> sentIpFilters = Collections.newSetFromMap(
new ConcurrentHashMap<IPCriterion, Boolean>());
// local stores for port-vlan mapping
Map<PortNumber, VlanId> port2Vlan = new ConcurrentHashMap<PortNumber, VlanId>();
Map<VlanId, Set<PortNumber>> vlan2Port = new ConcurrentHashMap<VlanId,
Set<PortNumber>>();
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.serviceDirectory = context.directory();
......@@ -275,26 +285,23 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
@Override
public void next(NextObjective nextObjective) {
switch (nextObjective.type()) {
case SIMPLE:
Collection<TrafficTreatment> treatments = nextObjective.next();
if (treatments.size() != 1) {
log.error("Next Objectives of type Simple should only have a "
+ "single Traffic Treatment. Next Objective Id:{}", nextObjective.id());
fail(nextObjective, ObjectiveError.BADPARAMS);
return;
log.debug("Processing NextObjective id{} op{}", nextObjective.id(),
nextObjective.op());
if (nextObjective.op() == Objective.Operation.REMOVE) {
if (nextObjective.next().isEmpty()) {
removeGroup(nextObjective);
} else {
removeBucketFromGroup(nextObjective);
}
processSimpleNextObjective(nextObjective);
break;
case HASHED:
case BROADCAST:
case FAILOVER:
fail(nextObjective, ObjectiveError.UNSUPPORTED);
log.warn("Unsupported next objective type {}", nextObjective.type());
break;
default:
fail(nextObjective, ObjectiveError.UNKNOWN);
log.warn("Unknown next objective type {}", nextObjective.type());
} else if (nextObjective.op() == Objective.Operation.ADD) {
NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id());
if (nextGroup != null) {
addBucketToGroup(nextObjective);
} else {
addGroup(nextObjective);
}
} else {
log.warn("Unsupported operation {}", nextObjective.op());
}
}
......@@ -302,6 +309,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
// Flow handling
//////////////////////////////////////
/**
* As per OFDPA 2.0 TTP, filtering of VLAN ids, MAC addresses (for routing)
* and IP addresses configured on switch ports happen in different tables.
......@@ -455,14 +463,19 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
selector.matchVlanId(vidCriterion.vlanId());
treatment.transition(TMAC_TABLE);
VlanId storeVlan = null;
if (vidCriterion.vlanId() == VlanId.NONE) {
// untagged packets are assigned vlans
treatment.pushVlan().setVlanId(assignedVlan);
// XXX ofdpa will require an additional vlan match on the assigned vlan
// and it may not require the push. This is not in compliance with OF
// standard. Waiting on what the exact flows are going to look like.
storeVlan = assignedVlan;
} else {
storeVlan = vidCriterion.vlanId();
}
treatment.transition(TMAC_TABLE);
// ofdpa cannot match on ALL portnumber, so we need to use separate
// rules for each port.
......@@ -476,7 +489,20 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
} else {
portnums.add(portCriterion.port());
}
for (PortNumber pnum : portnums) {
// update storage
port2Vlan.put(pnum, storeVlan);
Set<PortNumber> vlanPorts = vlan2Port.get(storeVlan);
if (vlanPorts == null) {
vlanPorts = Collections.newSetFromMap(
new ConcurrentHashMap<PortNumber, Boolean>());
vlanPorts.add(pnum);
vlan2Port.put(storeVlan, vlanPorts);
} else {
vlanPorts.add(pnum);
}
// create rest of flowrule
selector.matchInPort(pnum);
FlowRule rule = DefaultFlowRule.builder()
.forDevice(deviceId)
......@@ -708,10 +734,39 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
// Group handling
//////////////////////////////////////
private void addGroup(NextObjective nextObjective) {
switch (nextObjective.type()) {
case SIMPLE:
Collection<TrafficTreatment> treatments = nextObjective.next();
if (treatments.size() != 1) {
log.error("Next Objectives of type Simple should only have a "
+ "single Traffic Treatment. Next Objective Id:{}",
nextObjective.id());
fail(nextObjective, ObjectiveError.BADPARAMS);
return;
}
processSimpleNextObjective(nextObjective);
break;
case BROADCAST:
processBroadcastNextObjective(nextObjective);
break;
case HASHED:
processHashedNextObjective(nextObjective);
break;
case FAILOVER:
fail(nextObjective, ObjectiveError.UNSUPPORTED);
log.warn("Unsupported next objective type {}", nextObjective.type());
break;
default:
fail(nextObjective, ObjectiveError.UNKNOWN);
log.warn("Unknown next objective type {}", nextObjective.type());
}
}
/**
* As per the OFDPA 2.0 TTP, packets are sent out of ports by using
* a chain of groups, namely an L3 Unicast Group that points to an L2 Interface
* Group which in turns points to an output port. The Next Objective passed
* Group which in-turn points to an output port. The Next Objective passed
* in by the application has to be broken up into a group chain
* to satisfy this TTP.
*
......@@ -770,7 +825,9 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
Integer l3groupId = L3UNICASTMASK | (int) portNum;
l3utt.group(new DefaultGroupId(l2groupId));
GroupChainElem gce = new GroupChainElem(l3groupkey, l3groupId,
l3utt.build(), nextObj.appId());
GroupDescription.Type.INDIRECT,
Collections.singletonList(l3utt.build()),
nextObj.appId(), 1);
// create object for local and distributed storage
List<GroupKey> gkeys = new ArrayList<GroupKey>();
......@@ -797,27 +854,201 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
}
/**
* As per the OFDPA 2.0 TTP, packets are sent out of ports by using
* a chain of groups. The Next Objective passed in by the application
* has to be broken up into a group chain comprising of an
* L2 Flood group whose buckets point to L2 Interface groups.
*
* @param nextObj the nextObjective of type BROADCAST
*/
private void processBroadcastNextObjective(NextObjective nextObj) {
// break up broadcast next objective to multiple groups
Collection<TrafficTreatment> buckets = nextObj.next();
// each treatment is converted to an L2 interface group
int indicator = 0;
VlanId vlanid = null;
List<GroupInfo> groupInfoCollection = new ArrayList<>();
for (TrafficTreatment treatment : buckets) {
TrafficTreatment.Builder newTreatment = DefaultTrafficTreatment.builder();
PortNumber portNum = null;
// ensure that the only allowed treatments are pop-vlan and output
for (Instruction ins : treatment.allInstructions()) {
if (ins.type() == Instruction.Type.L2MODIFICATION) {
L2ModificationInstruction l2ins = (L2ModificationInstruction) ins;
switch (l2ins.subtype()) {
case VLAN_POP:
newTreatment.add(l2ins);
break;
default:
log.debug("action {} not permitted for broadcast nextObj",
l2ins.subtype());
break;
}
} else if (ins.type() == Instruction.Type.OUTPUT) {
portNum = ((OutputInstruction) ins).port();
newTreatment.add(ins);
} else {
log.debug("TrafficTreatment of type {} not permitted in "
+ " broadcast nextObjective", ins.type());
}
}
// also ensure that all ports are in the same vlan
VlanId thisvlanid = port2Vlan.get(portNum);
if (vlanid == null) {
vlanid = thisvlanid;
} else {
if (!vlanid.equals(thisvlanid)) {
log.error("Driver requires all ports in a broadcast nextObj "
+ "to be in the same vlan. Different vlans found "
+ "{} and {}. Aborting group creation", vlanid, thisvlanid);
return;
}
}
// assemble info for all l2 interface groups
indicator += GROUP1MASK;
int l2gk = nextObj.id() | indicator;
final GroupKey l2groupkey = new DefaultGroupKey(appKryo.serialize(l2gk));
Integer l2groupId = L2INTERFACEMASK | (vlanid.toShort() << 16) |
(int) portNum.toLong();
GroupBucket newbucket =
DefaultGroupBucket.createIndirectGroupBucket(newTreatment.build());
// store the info needed to create this group
groupInfoCollection.add(new GroupInfo(l2groupId, l2groupkey, newbucket));
}
// assemble info for l2 flood group
int l2floodgk = nextObj.id() | GROUP0MASK;
final GroupKey l2floodgroupkey = new DefaultGroupKey(appKryo.serialize(l2floodgk));
Integer l2floodgroupId = L2FLOODMASK | (vlanid.toShort() << 16) | nextObj.id();
// collection of treatment with groupids of l2 interface groups
List<TrafficTreatment> floodtt = new ArrayList<>();
for (GroupInfo gi : groupInfoCollection) {
TrafficTreatment.Builder ttb = DefaultTrafficTreatment.builder();
ttb.group(new DefaultGroupId(gi.groupId));
floodtt.add(ttb.build());
}
GroupChainElem gce = new GroupChainElem(l2floodgroupkey, l2floodgroupId,
GroupDescription.Type.ALL,
floodtt,
nextObj.appId(),
groupInfoCollection.size());
// create objects for local and distributed storage
List<GroupKey> gkeys = new ArrayList<GroupKey>();
gkeys.add(l2floodgroupkey); // group0 in chain
OfdpaGroupChain ofdpaGrp = new OfdpaGroupChain(gkeys, nextObj);
// store l2floodgroupkey with the ofdpaGroupChain for the nextObjective
// that depends on it
pendingNextObjectives.put(l2floodgroupkey, ofdpaGrp);
for (GroupInfo gi : groupInfoCollection) {
// store all l2groupkeys with the groupChainElem for the l2floodgroup
// that depends on it
pendingGroups.put(gi.groupKey, gce);
// create and send groups for all l2 interface groups
GroupDescription groupDescription =
new DefaultGroupDescription(
deviceId,
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections.singletonList(gi.groupBucket)),
gi.groupKey,
gi.groupId,
nextObj.appId());
groupService.addGroup(groupDescription);
}
}
private class GroupInfo {
private Integer groupId;
private GroupKey groupKey;
private GroupBucket groupBucket;
GroupInfo(Integer groupId, GroupKey groupKey, GroupBucket groupBucket) {
this.groupBucket = groupBucket;
this.groupId = groupId;
this.groupKey = groupKey;
}
}
private void processHashedNextObjective(NextObjective nextObj) {
// TODO Auto-generated method stub
}
private void addBucketToGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
}
private void removeBucketFromGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
}
private void removeGroup(NextObjective nextObjective) {
// TODO Auto-generated method stub
}
/**
* Processes next element of a group chain. Assumption is that if this
* group points to another group, the latter has already been created
* and this driver has received notification for it. A second assumption is
* that if there is another group waiting for this group then the appropriate
* stores already have the information to act upon the notification for the
* creating of this group.
* <p>
* The processing of the GroupChainElement depends on the number of groups
* this element is waiting on. For all group types other than SIMPLE, a
* GroupChainElement could be waiting on multiple groups.
*
* @param gce the group chain element to be processed next
*/
private void processGroupChain(GroupChainElem gce) {
GroupBucket bucket = DefaultGroupBucket
.createIndirectGroupBucket(gce.getBucketActions());
GroupDescription groupDesc = new DefaultGroupDescription(deviceId,
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections.singletonList(bucket)),
gce.getGkey(),
gce.getGivenGroupId(),
gce.getAppId());
groupService.addGroup(groupDesc);
}
int waitOnGroups = gce.decrementAndGetGroupsWaitedOn();
if (waitOnGroups != 0) {
log.debug("GCE: {} waiting on {} groups. Not processing yet",
gce, waitOnGroups);
return;
}
List<GroupBucket> buckets = new ArrayList<>();
switch (gce.groupType) {
case INDIRECT:
GroupBucket ibucket = DefaultGroupBucket
.createIndirectGroupBucket(gce.bucketActions.iterator().next());
buckets.add(ibucket);
break;
case ALL:
for (TrafficTreatment tt : gce.bucketActions) {
GroupBucket abucket = DefaultGroupBucket
.createAllGroupBucket(tt);
buckets.add(abucket);
}
break;
case SELECT:
for (TrafficTreatment tt : gce.bucketActions) {
GroupBucket sbucket = DefaultGroupBucket
.createSelectGroupBucket(tt);
buckets.add(sbucket);
}
break;
case FAILOVER:
default:
log.error("Unknown or unimplemented GroupChainElem {}", gce);
}
if (buckets.size() > 0) {
GroupDescription groupDesc = new DefaultGroupDescription(
deviceId, gce.groupType,
new GroupBuckets(buckets),
gce.gkey,
gce.givenGroupId,
gce.appId);
groupService.addGroup(groupDesc);
}
}
private class GroupChecker implements Runnable {
@Override
......@@ -837,7 +1068,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
log.info("Group service processed group key {}. Processing next "
+ "group in group chain with group key {}",
appKryo.deserialize(key.key()),
appKryo.deserialize(gce.getGkey().key()));
appKryo.deserialize(gce.gkey.key()));
processGroupChain(gce);
} else {
OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key);
......@@ -866,7 +1097,7 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
log.info("group ADDED with group key {} .. "
+ "Processing next group in group chain with group key {}",
appKryo.deserialize(key.key()),
appKryo.deserialize(gce.getGkey().key()));
appKryo.deserialize(gce.gkey.key()));
processGroupChain(gce);
} else {
OfdpaGroupChain obj = pendingNextObjectives.getIfPresent(key);
......@@ -890,6 +1121,11 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* look like group0 --> group 1 --> outPort. Information about the groups
* themselves can be fetched from the Group Service using the group keys from
* objects instantiating this class.
*
* XXX Revisit this - since the forwarding objective only ever needs the
* groupkey of the top-level group in the group chain, why store a series
* of groupkeys. Also the group-chain list only works for 1-to-1 chaining,
* not for 1-to-many chaining.
*/
private class OfdpaGroupChain implements NextGroup {
private final NextObjective nextObj;
......@@ -925,33 +1161,40 @@ public class OFDPA2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* preceding groups in the group chain to be created.
*/
private class GroupChainElem {
private TrafficTreatment bucketActions;
private Collection<TrafficTreatment> bucketActions;
private Integer givenGroupId;
private GroupDescription.Type groupType;
private GroupKey gkey;
private ApplicationId appId;
private AtomicInteger waitOnGroups;
public GroupChainElem(GroupKey gkey, Integer givenGroupId,
TrafficTreatment tr, ApplicationId appId) {
GroupChainElem(GroupKey gkey, Integer givenGroupId,
GroupDescription.Type groupType,
Collection<TrafficTreatment> tr, ApplicationId appId,
int waitOnGroups) {
this.bucketActions = tr;
this.givenGroupId = givenGroupId;
this.groupType = groupType;
this.gkey = gkey;
this.appId = appId;
this.waitOnGroups = new AtomicInteger(waitOnGroups);
}
public TrafficTreatment getBucketActions() {
return bucketActions;
}
public Integer getGivenGroupId() {
return givenGroupId;
/**
* This methods atomically decrements the counter for the number of
* groups this GroupChainElement is waiting on, for notifications from
* the Group Service. When this method returns a value of 0, this
* GroupChainElement is ready to be processed.
*
* @return integer indication of the number of notifications being waited on
*/
int decrementAndGetGroupsWaitedOn() {
return waitOnGroups.decrementAndGet();
}
public GroupKey getGkey() {
return gkey;
}
public ApplicationId getAppId() {
return appId;
@Override
public String toString() {
return Integer.toHexString(givenGroupId);
}
}
......