Carmelo Cascone
Committed by Gerrit Code Review

Simplified Bmv2 device context service and context handling in demo apps

Change-Id: I2a13ed673902d0616732d43c841f50b1ad38cd4c
......@@ -56,9 +56,11 @@ import org.onosproject.net.topology.TopologyVertex;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -135,10 +137,13 @@ public abstract class AbstractUpgradableFabricApp {
private Set<DeviceId> spineSwitches;
private Map<DeviceId, List<FlowRule>> deviceFlowRules;
private Map<DeviceId, Boolean> rulesInstalled;
private Map<DeviceId, Boolean> contextFlags;
private Map<DeviceId, Boolean> ruleFlags;
private ConcurrentMap<DeviceId, Boolean> deployLocks = Maps.newConcurrentMap();
/**
* Creates a new Bmv2 Fabric Component.
* Creates a new BMv2 fabric app.
*
* @param appName app name
* @param configurationName a common name for the P4 program / BMv2 configuration used by this app
......@@ -212,7 +217,8 @@ public abstract class AbstractUpgradableFabricApp {
leafSwitches = Sets.newHashSet();
spineSwitches = Sets.newHashSet();
deviceFlowRules = Maps.newConcurrentMap();
rulesInstalled = Maps.newConcurrentMap();
ruleFlags = Maps.newConcurrentMap();
contextFlags = Maps.newConcurrentMap();
}
// Start flow rules generator...
......@@ -266,41 +272,19 @@ public abstract class AbstractUpgradableFabricApp {
private void deployRoutine() {
if (otherAppFound && otherApp.appActive) {
log.info("Starting update routine...");
updateRoutine();
log.info("Deactivating other app...");
appService.deactivate(otherApp.appId);
} else {
Stream.concat(leafSwitches.stream(), spineSwitches.stream())
.map(deviceService::getDevice)
.forEach(device -> spawnTask(() -> deployDevice(device)));
try {
Thread.sleep(CLEANUP_SLEEP);
} catch (InterruptedException e) {
log.warn("Cleanup sleep interrupted!");
Thread.interrupted();
}
}
}
private void updateRoutine() {
Stream.concat(leafSwitches.stream(), spineSwitches.stream())
.forEach(did -> spawnTask(() -> {
cleanUpDevice(did);
try {
Thread.sleep(CLEANUP_SLEEP);
} catch (InterruptedException e) {
log.warn("Cleanup sleep interrupted!");
Thread.interrupted();
}
deployDevice(deviceService.getDevice(did));
}));
}
private void cleanUpDevice(DeviceId deviceId) {
List<FlowRule> flowRulesToRemove = Lists.newArrayList();
flowRuleService.getFlowEntries(deviceId).forEach(fe -> {
if (fe.appId() == otherApp.appId.id()) {
flowRulesToRemove.add(fe);
}
});
if (flowRulesToRemove.size() > 0) {
log.info("Cleaning {} old flow rules from {}...", flowRulesToRemove.size(), deviceId);
removeFlowRules(flowRulesToRemove);
}
.map(deviceService::getDevice)
.forEach(device -> spawnTask(() -> deployDevice(device)));
}
/**
......@@ -309,36 +293,35 @@ public abstract class AbstractUpgradableFabricApp {
* @param device a device
*/
public void deployDevice(Device device) {
// Serialize executions per device ID using a concurrent map.
rulesInstalled.compute(device.id(), (did, deployed) -> {
Bmv2DeviceContext deviceContext = bmv2ContextService.getContext(device.id());
if (deviceContext == null) {
log.error("Unable to get context for device {}", device.id());
return deployed;
} else if (!deviceContext.equals(bmv2Context)) {
log.info("Swapping configuration to {} on device {}...", configurationName, device.id());
bmv2ContextService.triggerConfigurationSwap(device.id(), bmv2Context);
return deployed;
DeviceId deviceId = device.id();
// Synchronize executions over the same device.
deployLocks.putIfAbsent(deviceId, new Boolean(true));
synchronized (deployLocks.get(deviceId)) {
// Set context if not already done.
if (!contextFlags.getOrDefault(deviceId, false)) {
log.info("Setting context to {} for {}...", configurationName, deviceId);
bmv2ContextService.setContext(deviceId, bmv2Context);
contextFlags.put(device.id(), true);
}
// Initialize device.
if (!initDevice(deviceId)) {
log.warn("Failed to initialize device {}", deviceId);
}
List<FlowRule> rules = deviceFlowRules.get(device.id());
if (initDevice(device.id())) {
if (deployed == null && rules != null && rules.size() > 0) {
log.info("Installing rules for {}...", did);
// Install rules.
if (!ruleFlags.getOrDefault(deviceId, false)) {
List<FlowRule> rules = deviceFlowRules.getOrDefault(deviceId, Collections.emptyList());
if (rules.size() > 0) {
log.info("Installing rules for {}...", deviceId);
installFlowRules(rules);
return true;
}
} else {
log.warn("Filed to initialize device {}", device.id());
if (deployed != null && rules != null && rules.size() > 0) {
log.info("Removing rules for {}...", did);
removeFlowRules(rules);
return null;
ruleFlags.put(deviceId, true);
}
}
return deployed;
});
}
}
private void spawnTask(Runnable task) {
......
......@@ -164,7 +164,7 @@ public interface Bmv2DeviceAgent {
* @param jsonString a string value
* @throws Bmv2RuntimeException if any error occurs
*/
void loadNewJsonConfig(String jsonString) throws Bmv2RuntimeException;
void uploadNewJsonConfig(String jsonString) throws Bmv2RuntimeException;
/**
* Triggers a configuration swap on the device.
......
......@@ -25,12 +25,8 @@ import org.onosproject.net.DeviceId;
*/
public interface Bmv2DeviceContextService {
// TODO: handle the potential configuration states (e.g. RUNNING, SWAP_REQUESTED, etc.)
/**
* Returns the context of a given device. The context returned is the last one for which a configuration swap was
* triggered, hence there's no guarantees that the device is enforcing the returned context's configuration at the
* time of the call.
* Returns the context of the given device, null if no context has been previously set.
*
* @param deviceId a device ID
* @return a BMv2 device context
......@@ -38,12 +34,12 @@ public interface Bmv2DeviceContextService {
Bmv2DeviceContext getContext(DeviceId deviceId);
/**
* Triggers a configuration swap on a given device.
* Sets the context for the given device.
*
* @param deviceId a device ID
* @param context a BMv2 device context
*/
void triggerConfigurationSwap(DeviceId deviceId, Bmv2DeviceContext context);
void setContext(DeviceId deviceId, Bmv2DeviceContext context);
/**
* Binds the given interpreter with the given class loader so that other ONOS instances in the cluster can properly
......@@ -55,14 +51,9 @@ public interface Bmv2DeviceContextService {
void registerInterpreterClassLoader(Class<? extends Bmv2Interpreter> interpreterClass, ClassLoader loader);
/**
* Notifies this service that a given device has been updated, meaning a potential context change.
* It returns true if the device configuration is the same as the last for which a swap was triggered, false
* otherwise. In the last case, the service will asynchronously trigger a swap to the last
* configuration stored by this service. If no swap has already been triggered then a default configuration will be
* applied.
* Returns a default context.
*
* @param deviceId a device ID
* @return a boolean value
* @return a BMv2 device context
*/
boolean notifyDeviceChange(DeviceId deviceId);
Bmv2DeviceContext defaultContext();
}
......
......@@ -29,7 +29,7 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.SharedExecutors;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.bmv2.api.context.Bmv2Configuration;
import org.onosproject.bmv2.api.context.Bmv2DefaultConfiguration;
import org.onosproject.bmv2.api.context.Bmv2DeviceContext;
......@@ -38,11 +38,17 @@ import org.onosproject.bmv2.api.runtime.Bmv2DeviceAgent;
import org.onosproject.bmv2.api.runtime.Bmv2RuntimeException;
import org.onosproject.bmv2.api.service.Bmv2Controller;
import org.onosproject.bmv2.api.service.Bmv2DeviceContextService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,30 +56,43 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.bmv2.api.context.Bmv2DefaultConfiguration.parse;
import static org.onosproject.store.service.MapEvent.Type.INSERT;
import static org.onosproject.store.service.MapEvent.Type.UPDATE;
@Component(immediate = true)
@Service
public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
private static final String JSON_DEFAULT_CONFIG_PATH = "/default.json";
private static final long CHECK_INTERVAL = 5_000; // milliseconds
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private Bmv2Controller controller;
private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
private final ScheduledExecutorService scheduledExecutor = SharedScheduledExecutors.getPoolThreadExecutor();
private final MapEventListener<DeviceId, Bmv2DeviceContext> contextListener = new ContextMapEventListener();
private final ConcurrentMap<DeviceId, Boolean> deviceLocks = Maps.newConcurrentMap();
private ConsistentMap<DeviceId, Bmv2DeviceContext> contexts;
private Map<DeviceId, Bmv2DeviceContext> contextsMap;
private Map<String, ClassLoader> interpreterClassLoaders;
private Bmv2DeviceContext defaultContext;
private ScheduledFuture<?> configChecker = null;
private final Logger log = LoggerFactory.getLogger(getClass());
......@@ -88,39 +107,55 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
.withSerializer(Serializer.using(kryo))
.withName("onos-bmv2-contexts")
.build();
contextsMap = contexts.asJavaMap();
interpreterClassLoaders = Maps.newConcurrentMap();
Bmv2Configuration defaultConfiguration = loadDefaultConfiguration();
Bmv2Interpreter defaultInterpreter = new Bmv2DefaultInterpreterImpl();
defaultContext = new Bmv2DeviceContext(defaultConfiguration, defaultInterpreter);
interpreterClassLoaders.put(defaultInterpreter.getClass().getName(), this.getClass().getClassLoader());
interpreterClassLoaders = Maps.newConcurrentMap();
registerInterpreterClassLoader(defaultInterpreter.getClass(), this.getClass().getClassLoader());
contexts.addListener(contextListener);
if (configChecker != null && configChecker.isCancelled()) {
configChecker.cancel(false);
}
configChecker = scheduledExecutor.scheduleAtFixedRate(this::checkDevices, 0, CHECK_INTERVAL,
TimeUnit.MILLISECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
contexts.removeListener(contextListener);
if (configChecker != null) {
configChecker.cancel(false);
}
log.info("Stopped");
}
@Override
public Bmv2DeviceContext getContext(DeviceId deviceId) {
checkNotNull(deviceId, "device id cannot be null");
return contextsMap.get(deviceId);
Versioned<Bmv2DeviceContext> versionedContext = contexts.get(deviceId);
return (versionedContext == null) ? null : versionedContext.value();
}
@Override
public void triggerConfigurationSwap(DeviceId deviceId, Bmv2DeviceContext context) {
public void setContext(DeviceId deviceId, Bmv2DeviceContext context) {
checkNotNull(deviceId, "device id cannot be null");
checkNotNull(context, "context cannot be null");
if (!interpreterClassLoaders.containsKey(context.interpreter().getClass().getName())) {
log.error("Unable to trigger configuration swap, missing class loader for context interpreter. " +
"Please register it with registerInterpreterClassLoader()");
log.error("Unable to set context, missing class loader for interpreter '{}'. " +
"Please register it with registerInterpreterClassLoader()",
context.interpreter().getClass().getName());
} else {
executorService.execute(() -> executeConfigurationSwap(deviceId, context));
try {
contexts.put(deviceId, context);
} catch (ConsistentMapException.ConcurrentModification e) {
log.error("Detected concurrent modification on context map");
}
}
}
......@@ -129,86 +164,83 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
interpreterClassLoaders.put(interpreterClass.getName(), loader);
}
private void executeConfigurationSwap(DeviceId deviceId, Bmv2DeviceContext context) {
contexts.compute(deviceId, (key, existingValue) -> {
if (context.equals(existingValue)) {
log.info("Dropping swap request as one has already been triggered for the given context.");
return existingValue;
}
try {
Bmv2DeviceAgent agent = controller.getAgent(deviceId);
String jsonString = context.configuration().json().toString();
agent.loadNewJsonConfig(jsonString);
agent.swapJsonConfig();
return context;
} catch (Bmv2RuntimeException e) {
log.error("Unable to swap configuration on {}: {}", deviceId, e.explain());
return existingValue;
}
});
}
@Override
public boolean notifyDeviceChange(DeviceId deviceId) {
checkNotNull(deviceId, "device id cannot be null");
public Bmv2DeviceContext defaultContext() {
return defaultContext;
}
Bmv2DeviceContext storedContext = getContext(deviceId);
private void configCheck(DeviceId deviceId) {
// Synchronize executions over the same deviceId.
deviceLocks.putIfAbsent(deviceId, new Boolean(true));
synchronized (deviceLocks.get(deviceId)) {
if (storedContext == null) {
log.info("No context previously stored for {}, swapping to DEFAULT_CONTEXT.", deviceId);
triggerConfigurationSwap(deviceId, defaultContext);
// Device can be accepted.
return false;
} else {
Bmv2Configuration deviceConfiguration = loadDeviceConfiguration(deviceId);
if (deviceConfiguration == null) {
log.warn("Unable to load configuration from device {}", deviceId);
return false;
Bmv2DeviceContext storedContext = getContext(deviceId);
if (storedContext == null) {
return;
}
if (storedContext.configuration().equals(deviceConfiguration)) {
return true;
} else {
log.info("Device context is different from the stored one, triggering configuration swap for {}...",
deviceId);
triggerConfigurationSwap(deviceId, storedContext);
return false;
log.trace("Executing configuration check on {}...", deviceId);
try {
// FIXME: JSON dump is heavy, can we use the JSON MD5 to check the running configuration?
String jsonString = controller.getAgent(deviceId).dumpJsonConfig();
Bmv2Configuration deviceConfiguration = parse(Json.parse(jsonString).asObject());
if (!storedContext.configuration().equals(deviceConfiguration)) {
log.info("Triggering configuration swap on {}...", deviceId);
try {
Bmv2DeviceAgent agent = controller.getAgent(deviceId);
String newJsonString = storedContext.configuration().json().toString();
agent.uploadNewJsonConfig(newJsonString);
agent.swapJsonConfig();
} catch (Bmv2RuntimeException e) {
log.error("Unable to swap configuration on {}: {}", deviceId, e.explain());
}
}
} catch (Bmv2RuntimeException e) {
log.warn("Unable to dump JSON configuration from {}: {}", deviceId, e.explain());
}
}
}
/**
* Load and parse a BMv2 JSON configuration from the given device.
*
* @param deviceId a device id
* @return a BMv2 configuration
*/
private Bmv2Configuration loadDeviceConfiguration(DeviceId deviceId) {
try {
String jsonString = controller.getAgent(deviceId).dumpJsonConfig();
return Bmv2DefaultConfiguration.parse(Json.parse(jsonString).asObject());
} catch (Bmv2RuntimeException e) {
log.warn("Unable to load JSON configuration from {}: {}", deviceId, e.explain());
return null;
private void triggerConfigCheck(DeviceId deviceId) {
if (mastershipService.isLocalMaster(deviceId)) {
scheduledExecutor.schedule(() -> configCheck(deviceId), 0, TimeUnit.SECONDS);
}
}
/**
* Loads default configuration from file.
*
* @return a BMv2 configuration
*/
private void checkDevices() {
deviceService.getAvailableDevices().forEach(device -> {
triggerConfigCheck(device.id());
});
}
protected static Bmv2DefaultConfiguration loadDefaultConfiguration() {
try {
JsonObject json = Json.parse(new BufferedReader(new InputStreamReader(
Bmv2DeviceContextServiceImpl.class.getResourceAsStream(JSON_DEFAULT_CONFIG_PATH)))).asObject();
return Bmv2DefaultConfiguration.parse(json);
return parse(json);
} catch (IOException e) {
throw new RuntimeException("Unable to load default configuration", e);
}
}
/**
* Internal BMv2 context serializer.
* Listener of context changes that immediately triggers config checks (to swap the config if necessary).
*/
private class ContextMapEventListener implements MapEventListener<DeviceId, Bmv2DeviceContext> {
@Override
public void event(MapEvent<DeviceId, Bmv2DeviceContext> event) {
DeviceId deviceId = event.key();
if (event.type().equals(INSERT) || event.type().equals(UPDATE)) {
log.trace("Context {} for {}", event.type().name(), deviceId);
triggerConfigCheck(deviceId);
}
}
}
/**
* Context serializer.
*/
private class BmvDeviceContextSerializer extends com.esotericsoftware.kryo.Serializer<Bmv2DeviceContext> {
......@@ -222,7 +254,7 @@ public class Bmv2DeviceContextServiceImpl implements Bmv2DeviceContextService {
public Bmv2DeviceContext read(Kryo kryo, Input input, Class<Bmv2DeviceContext> type) {
String jsonStr = kryo.readObject(input, String.class);
String interpreterClassName = kryo.readObject(input, String.class);
Bmv2Configuration configuration = Bmv2DefaultConfiguration.parse(Json.parse(jsonStr).asObject());
Bmv2Configuration configuration = parse(Json.parse(jsonStr).asObject());
ClassLoader loader = interpreterClassLoaders.get(interpreterClassName);
if (loader == null) {
throw new IllegalStateException("No class loader registered for interpreter: " + interpreterClassName);
......
......@@ -397,7 +397,7 @@ public final class Bmv2DeviceThriftClient implements Bmv2DeviceAgent {
}
@Override
public void loadNewJsonConfig(String jsonString) throws Bmv2RuntimeException {
public void uploadNewJsonConfig(String jsonString) throws Bmv2RuntimeException {
log.debug("Loading new JSON config on device... > deviceId={}, jsonStringLength={}",
deviceId, jsonString.length());
......
......@@ -185,9 +185,11 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
(!Objects.equals(thisDescription, lastDescription) ||
!Objects.equals(thisDescription.annotations(), lastDescription.annotations()));
if (descriptionChanged || !deviceService.isAvailable(did)) {
// Device description changed or device not available in the core.
if (contextService.notifyDeviceChange(did)) {
// Configuration is the expected one, we can proceed notifying the core.
if (contextService.getContext(did) == null) {
// Device is a first timer.
log.info("Setting DEFAULT context for {}", did);
contextService.setContext(did, contextService.defaultContext());
} else {
resetDeviceState(did);
initPortCounters(did);
providerService.deviceConnected(did, thisDescription);
......@@ -295,7 +297,7 @@ public class Bmv2DeviceProvider extends AbstractDeviceProvider {
if (cfg != null) {
try {
cfg.getDevicesInfo().stream().forEach(info -> {
// TODO: require also bmv2 internal device id from net-cfg (now is default 0)
// FIXME: require also bmv2 internal device id from net-cfg (now is default 0)
Bmv2Device bmv2Device = new Bmv2Device(info.ip().toString(), info.port(), 0);
triggerProbe(bmv2Device.asDeviceId());
});
......