Jonathan Hart
Committed by Gerrit Code Review

Upgrade packet requests to use flow objectives API.

Addressed a few issues found while using the flow objectives across a cluster:
 * Flow objectives should be installable from any node, not just the master.
   Therefore we need to ensure all nodes initialize a driver for each switch.
 * We no longer store a list of objectives that are waiting for the switch
   to arrive. If the we don't know about the switch yet we'll try a few times
   over a few seconds to find it, but after that we'll give up and report an
   error to the client.
 * Default drivers need to be available when the FlowObjectiveManager starts
   up, otherwise it is common to get flow objective requests before any
   drivers have been loaded.

Change-Id: I1c2ea6a223232402c31e8139729e4b6251ab8b0f
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.driver;
/**
* Service capable of providing a set of default drivers.
*/
public interface DefaultDriverProviderService extends DriverProvider {
}
......@@ -36,7 +36,7 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
private final int timeout;
private final ApplicationId appId;
private final int priority;
private final int nextId;
private final Integer nextId;
private final TrafficTreatment treatment;
private final Operation op;
private final Optional<ObjectiveContext> context;
......@@ -46,7 +46,7 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
private DefaultForwardingObjective(TrafficSelector selector,
Flag flag, boolean permanent,
int timeout, ApplicationId appId,
int priority, int nextId,
int priority, Integer nextId,
TrafficTreatment treatment, Operation op) {
this.selector = selector;
this.flag = flag;
......@@ -67,7 +67,7 @@ public final class DefaultForwardingObjective implements ForwardingObjective {
private DefaultForwardingObjective(TrafficSelector selector,
Flag flag, boolean permanent,
int timeout, ApplicationId appId,
int priority, int nextId,
int priority, Integer nextId,
TrafficTreatment treatment,
ObjectiveContext context, Operation op) {
this.selector = selector;
......
......@@ -41,6 +41,11 @@ public enum ObjectiveError {
GROUPMISSING,
/**
* The device was not available to install objectives to.
*/
DEVICEMISSING,
/**
* An unknown error occurred.
*/
UNKNOWN
......
......@@ -31,6 +31,7 @@ import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.driver.DefaultDriverData;
import org.onosproject.net.driver.DefaultDriverHandler;
import org.onosproject.net.driver.DefaultDriverProvider;
import org.onosproject.net.driver.DefaultDriverProviderService;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverAdminService;
import org.onosproject.net.driver.DriverHandler;
......@@ -62,16 +63,21 @@ public class DriverManager extends DefaultDriverProvider implements DriverAdminS
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DefaultDriverProviderService defaultDriverService;
private Set<DriverProvider> providers = Sets.newConcurrentHashSet();
private Map<String, Driver> driverByKey = Maps.newConcurrentMap();
@Activate
protected void activate() {
registerProvider(defaultDriverService);
log.info("Started");
}
@Deactivate
protected void deactivate() {
unregisterProvider(defaultDriverService);
log.info("Stopped");
}
......
......@@ -15,7 +15,6 @@
*/
package org.onosproject.net.flowobjective.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
......@@ -48,17 +47,18 @@ import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.flowobjective.ObjectiveEvent;
import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static com.google.common.base.Preconditions.checkState;
import static org.onlab.util.Tools.groupedThreads;
/**
* Provides implementation of the flow objective programming service.
......@@ -67,9 +67,10 @@ import static com.google.common.base.Preconditions.checkState;
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
private final Logger log = LoggerFactory.getLogger(getClass());
public static final int INSTALL_RETRY_ATTEMPTS = 5;
public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
public static final String NOT_INITIALIZED = "Driver not initialized";
private final Logger log = LoggerFactory.getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverService driverService;
......@@ -106,17 +107,18 @@ public class FlowObjectiveManager implements FlowObjectiveService {
protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
private final Map<DeviceId, Collection<Objective>> pendingObjectives =
Maps.newConcurrentMap();
private NodeId localNode;
private Map<Integer, Set<PendingNext>> pendingForwards =
Maps.newConcurrentMap();
private ExecutorService executorService;
@Activate
protected void activate() {
executorService = Executors.newFixedThreadPool(
4, groupedThreads("onos/objective-installer", "%d"));
flowObjectiveStore.setDelegate(delegate);
localNode = clusterService.getLocalNode().id();
mastershipService.addListener(mastershipListener);
......@@ -133,15 +135,55 @@ public class FlowObjectiveManager implements FlowObjectiveService {
log.info("Stopped");
}
@Override
public void filter(DeviceId deviceId,
FilteringObjective filteringObjective) {
if (deviceService.isAvailable(deviceId)) {
getDevicePipeliner(deviceId).filter(filteringObjective);
} else {
updatePendingMap(deviceId, filteringObjective);
/**
* Task that passes the flow objective down to the driver. The task will
* make a few attempts to find the appropriate driver, then eventually give
* up and report an error if no suitable driver could be found.
*/
private class ObjectiveInstaller implements Runnable {
private final DeviceId deviceId;
private final Objective objective;
private int numAttempts = 0;
public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
this.deviceId = deviceId;
this.objective = objective;
}
@Override
public void run() {
try {
numAttempts++;
Pipeliner pipeliner = getDevicePipeliner(deviceId);
if (pipeliner != null) {
if (objective instanceof NextObjective) {
pipeliner.next((NextObjective) objective);
} else if (objective instanceof ForwardingObjective) {
pipeliner.forward((ForwardingObjective) objective);
} else {
pipeliner.filter((FilteringObjective) objective);
}
} else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
Thread.currentThread().sleep(INSTALL_RETRY_INTERVAL);
executorService.submit(this);
} else {
// Otherwise we've tried a few times and failed, report an
// error back to the user.
objective.context().ifPresent(
c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
}
} catch (Exception e) {
log.warn("Exception while installing flow objective", e);
}
}
}
@Override
public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
}
@Override
......@@ -152,22 +194,12 @@ public class FlowObjectiveManager implements FlowObjectiveService {
return;
}
if (deviceService.isAvailable(deviceId)) {
getDevicePipeliner(deviceId).forward(forwardingObjective);
} else {
updatePendingMap(deviceId, forwardingObjective);
}
executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
}
@Override
public void next(DeviceId deviceId,
NextObjective nextObjective) {
if (deviceService.isAvailable(deviceId)) {
getDevicePipeliner(deviceId).next(nextObjective);
} else {
updatePendingMap(deviceId, nextObjective);
}
public void next(DeviceId deviceId, NextObjective nextObjective) {
executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
}
@Override
......@@ -189,49 +221,38 @@ public class FlowObjectiveManager implements FlowObjectiveService {
return false;
}
private void updatePendingMap(DeviceId deviceId, Objective pending) {
if (pendingObjectives.putIfAbsent(deviceId, Lists.newArrayList(pending)) != null) {
Collection<Objective> objectives = pendingObjectives.get(deviceId);
objectives.add(pending);
}
}
// Retrieves the device pipeline behaviour from the cache.
private Pipeliner getDevicePipeliner(DeviceId deviceId) {
Pipeliner pipeliner = pipeliners.get(deviceId);
checkState(pipeliner != null, NOT_INITIALIZED);
return pipeliner;
}
private void setupPipelineHandler(DeviceId deviceId) {
if (localNode.equals(mastershipService.getMasterFor(deviceId))) {
// Attempt to lookup the handler in the cache
DriverHandler handler = driverHandlers.get(deviceId);
if (handler == null) {
try {
// Otherwise create it and if it has pipeline behaviour, cache it
handler = driverService.createHandler(deviceId);
if (!handler.driver().hasBehaviour(Pipeliner.class)) {
log.warn("Pipeline behaviour not supported for device {}",
deviceId);
return;
}
} catch (ItemNotFoundException e) {
log.warn("No applicable driver for device {}", deviceId);
// Attempt to lookup the handler in the cache
DriverHandler handler = driverHandlers.get(deviceId);
if (handler == null) {
try {
// Otherwise create it and if it has pipeline behaviour, cache it
handler = driverService.createHandler(deviceId);
if (!handler.driver().hasBehaviour(Pipeliner.class)) {
log.warn("Pipeline behaviour not supported for device {}",
deviceId);
return;
}
driverHandlers.put(deviceId, handler);
} catch (ItemNotFoundException e) {
log.warn("No applicable driver for device {}", deviceId);
return;
}
// Always (re)initialize the pipeline behaviour
log.info("Driver {} bound to device {} ... initializing driver",
handler.driver().name(), deviceId);
Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
pipeliner.init(deviceId, context);
pipeliners.putIfAbsent(deviceId, pipeliner);
driverHandlers.put(deviceId, handler);
}
// Always (re)initialize the pipeline behaviour
log.info("Driver {} bound to device {} ... initializing driver",
handler.driver().name(), deviceId);
Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
pipeliner.init(deviceId, context);
pipeliners.putIfAbsent(deviceId, pipeliner);
}
// Triggers driver setup when the local node becomes a device master.
......@@ -240,10 +261,8 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public void event(MastershipEvent event) {
switch (event.type()) {
case MASTER_CHANGED:
if (event.roleInfo().master() != null) {
setupPipelineHandler(event.subject());
log.info("mastership changed on device {}", event.subject());
}
log.info("mastership changed on device {}", event.subject());
setupPipelineHandler(event.subject());
break;
case BACKUPS_CHANGED:
break;
......@@ -259,13 +278,14 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
setupPipelineHandler(event.subject().id());
break;
case DEVICE_AVAILABILITY_CHANGED:
log.info("Device either added or availability changed {}",
event.subject().id());
if (deviceService.isAvailable(event.subject().id())) {
log.info("Device is now available {}", event.subject().id());
setupPipelineHandler(event.subject().id());
processPendingObjectives(event.subject().id());
}
break;
case DEVICE_UPDATED:
......@@ -284,22 +304,6 @@ public class FlowObjectiveManager implements FlowObjectiveService {
break;
}
}
private void processPendingObjectives(DeviceId deviceId) {
log.debug("Processing pending objectives for device {}", deviceId);
pendingObjectives.getOrDefault(deviceId,
Collections.emptySet()).forEach(obj -> {
if (obj instanceof NextObjective) {
next(deviceId, (NextObjective) obj);
} else if (obj instanceof ForwardingObjective) {
forward(deviceId, (ForwardingObjective) obj);
} else {
getDevicePipeliner(deviceId)
.filter((FilteringObjective) obj);
}
});
}
}
// Processing context for initializing pipeline driver behaviours.
......@@ -313,8 +317,6 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public FlowObjectiveStore store() {
return flowObjectiveStore;
}
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
......@@ -356,7 +358,5 @@ public class FlowObjectiveManager implements FlowObjectiveService {
public ForwardingObjective forwardingObjective() {
return fwd;
}
}
}
......
......@@ -27,12 +27,17 @@ import org.onosproject.net.Device;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flowobjective.DefaultForwardingObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.packet.DefaultPacketRequest;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketContext;
......@@ -73,6 +78,9 @@ public class PacketManager
private CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private FlowObjectiveService objectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -169,12 +177,28 @@ public class PacketManager
return;
}
TrafficTreatment treatment = DefaultTrafficTreatment.builder().punt().build();
FlowRule flow = new DefaultFlowRule(device.id(), request.selector(), treatment,
request.priority().priorityValue(),
appId, 0, true, request.tableType());
flowService.applyFlowRules(flow);
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.punt()
.build();
ForwardingObjective forwarding = DefaultForwardingObjective.builder()
.withPriority(request.priority().priorityValue())
.withSelector(request.selector())
.fromApp(appId)
.withFlag(ForwardingObjective.Flag.VERSATILE)
.withTreatment(treatment)
.makePermanent()
.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) { }
@Override
public void onError(Objective objective, ObjectiveError error) {
log.warn("Failed to install packet request flow: {}", error);
}
});
objectiveService.forward(device.id(), forwarding);
}
@Override
......
......@@ -16,32 +16,31 @@
package org.onosproject.driver.pipeline;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.net.driver.DriverAdminService;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.net.driver.DefaultDriverProviderService;
import org.onosproject.net.driver.Driver;
import org.onosproject.net.driver.DriverProvider;
import org.onosproject.net.driver.XmlDriverLoader;
import org.apache.felix.scr.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
/**
* Bootstrap for built in drivers.
*/
@Component(immediate = true)
public class DefaultDrivers {
@Service
@Component(immediate = false)
public class DefaultDrivers implements DefaultDriverProviderService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String DRIVERS_XML = "/onos-drivers.xml";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverAdminService driverService;
private DriverProvider provider;
@Activate
......@@ -50,7 +49,6 @@ public class DefaultDrivers {
try {
InputStream stream = classLoader.getResourceAsStream(DRIVERS_XML);
provider = new XmlDriverLoader(classLoader).loadDrivers(stream);
driverService.registerProvider(provider);
} catch (IOException e) {
log.error("Unable to load default drivers", e);
}
......@@ -59,10 +57,11 @@ public class DefaultDrivers {
@Deactivate
protected void deactivate() {
if (provider != null) {
driverService.unregisterProvider(provider);
}
log.info("Stopped");
}
@Override
public Set<Driver> getDrivers() {
return provider.getDrivers();
}
}
......