Saurav Das
Committed by Gerrit Code Review

In this commit:

   Bug fix where filtering objectives are not installed due to available ports becoming enabled later.
   Bug fix where flow objective store had no listener for notifications from drivers across multiple instances of the controller.
   NPE fix in ofdpa driver for non-existing groups.
   Preventing ofdpa driver from sending spurious pass notification to app.
   Incrementing retry filter timer from 1 to 5 secs in default routing handler.
   Made several debug messages clearer.

Change-Id: I828671ee4c8bcfe03c946d051e1d1aac9d8f68dd
......@@ -47,7 +47,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
* routing rule population.
*/
public class DefaultRoutingHandler {
private static final int MAX_RETRY_ATTEMPTS = 5;
private static final int MAX_RETRY_ATTEMPTS = 25;
private static final String ECMPSPG_MISSING = "ECMP shortest path graph not found";
private static Logger log = LoggerFactory.getLogger(DefaultRoutingHandler.class);
......@@ -212,11 +212,11 @@ public class DefaultRoutingHandler {
log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", link.get(0));
EcmpShortestPathGraph ecmpSpg = new EcmpShortestPathGraph(link.get(0), srManager);
if (populateEcmpRoutingRules(link.get(0), ecmpSpg, ImmutableSet.of())) {
log.debug("Populating flow rules from {} to all is successful",
log.debug("Populating flow rules from all to dest:{} is successful",
link.get(0));
currentEcmpSpgMap.put(link.get(0), ecmpSpg);
} else {
log.warn("Failed to populate the flow rules from {} to all", link.get(0));
log.warn("Failed to populate the flow rules from all to dest:{}", link.get(0));
return false;
}
} else {
......@@ -463,9 +463,9 @@ public class DefaultRoutingHandler {
/**
* Populate ECMP rules for subnets from target to destination via nexthops.
*
* @param targetSw Device ID of target switch
* @param destSw Device ID of destination switch
* @param nextHops List of next hops
* @param targetSw Device ID of target switch in which rules will be programmed
* @param destSw Device ID of final destination switch to which the rules will forward
* @param nextHops List of next hops via which destSw will be reached
* @param subnets Subnets to be populated. If empty, populate all configured subnets.
* @return true if succeed
*/
......@@ -647,6 +647,8 @@ public class DefaultRoutingHandler {
@Override
public void run() {
log.info("RETRY FILTER ATTEMPT# {} for dev:{}",
MAX_RETRY_ATTEMPTS - attempts, devId);
boolean success = rulePopulator.populateRouterMacVlanFilters(devId);
if (!success && --attempts > 0) {
executorService.schedule(this, 200, TimeUnit.MILLISECONDS);
......
......@@ -239,7 +239,8 @@ public class FlowObjectiveManager implements FlowObjectiveService {
private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
if (fwd.nextId() != null &&
flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
log.debug("Queuing forwarding objective {} for nextId {} meant for device {}",
fwd.id(), fwd.nextId(), deviceId);
// TODO: change to computeIfAbsent
Set<PendingNext> newset = Collections.newSetFromMap(
new ConcurrentHashMap<PendingNext, Boolean>());
......@@ -398,7 +399,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
Set<PendingNext> pending = pendingForwards.remove(event.subject());
if (pending == null) {
log.warn("Nothing pending for this obj event {}", event);
log.debug("Nothing pending for this obj event {}", event);
return;
}
......@@ -457,7 +458,7 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public List<String> getNextMappings() {
List<String> mappings = new ArrayList<>();
Map<Integer, NextGroup> allnexts = flowObjectiveStore.getAllGroups();
// XXX if the NextGroup upon decoding stored info of the deviceId
// XXX if the NextGroup after de-serialization actually stored info of the deviceId
// then info on any nextObj could be retrieved from one controller instance.
// Right now the drivers on one instance can only fetch for next-ids that came
// to them.
......
......@@ -30,15 +30,22 @@ import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.AtomicCounter;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Manages the inventory of created next groups.
......@@ -57,9 +64,16 @@ public class DistributedFlowObjectiveStore
protected StorageService storageService;
private AtomicCounter nextIds;
private MapEventListener<Integer, byte[]> mapListener = new NextGroupListener();
// event queue to separate map-listener threads from event-handler threads (tpool)
private BlockingQueue<ObjectiveEvent> eventQ;
private ExecutorService tpool;
@Activate
public void activate() {
tpool = Executors.newFixedThreadPool(4, groupedThreads("onos/flobj-notifier", "%d", log));
eventQ = new LinkedBlockingQueue<ObjectiveEvent>();
tpool.execute(new FlowObjectiveNotifier());
nextGroups = storageService.<Integer, byte[]>consistentMapBuilder()
.withName("flowobjective-groups")
.withSerializer(Serializer.using(
......@@ -68,7 +82,7 @@ public class DistributedFlowObjectiveStore
.register(Versioned.class)
.build("DistributedFlowObjectiveStore")))
.build();
nextGroups.addListener(mapListener);
nextIds = storageService.getAtomicCounter("next-objective-counter");
log.info("Started");
}
......@@ -76,6 +90,7 @@ public class DistributedFlowObjectiveStore
@Deactivate
public void deactivate() {
tpool.shutdown();
log.info("Stopped");
}
......@@ -120,4 +135,37 @@ public class DistributedFlowObjectiveStore
public int allocateNextId() {
return (int) nextIds.incrementAndGet();
}
private class FlowObjectiveNotifier implements Runnable {
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
notifyDelegate(eventQ.take());
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
private class NextGroupListener implements MapEventListener<Integer, byte[]> {
@Override
public void event(MapEvent<Integer, byte[]> event) {
switch (event.type()) {
case INSERT:
eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.ADD, event.key()));
break;
case REMOVE:
eventQ.add(new ObjectiveEvent(ObjectiveEvent.Type.REMOVE, event.key()));
break;
case UPDATE:
// TODO Introduce UPDATE ObjectiveEvent when the map is being updated
break;
default:
break;
}
}
}
}
......
......@@ -182,6 +182,12 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
rules = processForward(fwd);
if (rules == null || rules.isEmpty()) {
// Assumes fail message has already been generated to the objective
// context. Returning here prevents spurious pass message to be
// generated by FlowRule service for empty flowOps.
return;
}
switch (fwd.op()) {
case ADD:
rules.stream()
......@@ -748,7 +754,7 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
* returned if there is an issue in processing the objective.
*/
protected Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
log.trace("Processing specific fwd objective:{} in dev:{} with next:{}",
log.debug("Processing specific fwd objective:{} in dev:{} with next:{}",
fwd.id(), deviceId, fwd.nextId());
boolean isEthTypeObj = isSupportedEthTypeObjective(fwd);
boolean isEthDstObj = isSupportedEthDstObjective(fwd);
......@@ -885,8 +891,8 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
if (fwd.nextId() != null) {
if (forTableId == MPLS_TABLE_1 && !popMpls) {
log.warn("SR CONTINUE case cannot be handled as MPLS ECMP "
+ "is not implemented in OF-DPA yet. Aborting this flow "
+ "in this device {}", deviceId);
+ "is not implemented in OF-DPA yet. Aborting this flow {} -> next:{}"
+ "in this device {}", fwd.id(), fwd.nextId(), deviceId);
// XXX We could convert to forwarding to a single-port, via a
// MPLS interface, or a MPLS SWAP (with-same) but that would
// have to be handled in the next-objective. Also the pop-mpls
......@@ -907,6 +913,11 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
return Collections.emptySet();
}
tb.deferred().group(group.id());
} else {
log.warn("Cannot find group for nextId:{} in dev:{}. Aborting fwd:{}",
fwd.nextId(), deviceId, fwd.id());
fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
return Collections.emptySet();
}
}
tb.transition(ACL_TABLE);
......@@ -1063,7 +1074,7 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
for (GroupKey gk : gkd) {
Group g = groupService.getGroup(deviceId, gk);
if (g == null) {
gchain.append(" ERROR").append(" -->");
gchain.append(" NoGrp").append(" -->");
continue;
}
gchain.append(" 0x").append(Integer.toHexString(g.id().id()))
......@@ -1071,7 +1082,12 @@ public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeline
lastGroup = g;
}
// add port information for last group in group-chain
for (Instruction i: lastGroup.buckets().buckets().get(0).treatment().allInstructions()) {
List<Instruction> lastGroupIns = new ArrayList<Instruction>();
if (gchain != null) {
lastGroupIns = lastGroup.buckets().buckets().get(0)
.treatment().allInstructions();
}
for (Instruction i: lastGroupIns) {
if (i instanceof OutputInstruction) {
gchain.append(" port:").append(((OutputInstruction) i).port());
}
......