alshabib
Committed by Ali "The Bomb" Al-Shabibi

Added test code to test the flow subsystem performance

Change-Id: I1a7c68492760a63b7d092c3ca71e4964123c8aa7
......@@ -106,6 +106,7 @@
com.sun.jersey.server.impl.container.servlet,
com.fasterxml.jackson.databind,
com.fasterxml.jackson.databind.node,
org.apache.commons.lang.math.*,
com.google.common.*,
org.onlab.packet.*,
org.onlab.rest.*,
......
......@@ -27,6 +27,12 @@ public interface DemoAPI {
enum InstallType { MESH, RANDOM };
/**
* Tests flow subsystem based on the parameters supplied.
* @param params the test parameters
*/
JsonNode flowTest(Optional<JsonNode> params);
/**
* Installs intents based on the installation type.
* @param type the installation type.
* @param runParams run params
......
......@@ -16,27 +16,40 @@
package org.onosproject.demo;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.MacAddress;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.Host;
import org.onosproject.net.HostId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.host.HostService;
......@@ -48,7 +61,6 @@ import org.onosproject.net.intent.IntentOperations;
import org.onosproject.net.intent.IntentService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
......@@ -59,11 +71,14 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -94,15 +109,23 @@ public class DemoInstaller implements DemoAPI {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowService;
private ExecutorService worker;
private ExecutorService randomWorker;
private ExecutorService installWorker;
private ApplicationId appId;
private final Set<Intent> existingIntents = new HashSet<>();
private RandomInstaller randomInstaller;
private ObjectMapper mapper = new ObjectMapper();
@Activate
......@@ -120,13 +143,33 @@ public class DemoInstaller implements DemoAPI {
@Deactivate
public void deactivate() {
shutdownAndAwaitTermination(worker);
if (!randomWorker.isShutdown()) {
shutdownAndAwaitTermination(randomWorker);
if (installWorker != null && !installWorker.isShutdown()) {
shutdownAndAwaitTermination(installWorker);
}
log.info("Stopped");
}
@Override
public JsonNode flowTest(Optional<JsonNode> params) {
int flowsPerDevice = 1000;
int neighbours = 0;
if (params.isPresent()) {
flowsPerDevice = params.get().get("flowsPerDevice").asInt();
neighbours = params.get().get("neighbours").asInt();
}
Future<JsonNode> future = worker.submit(new FlowTest(flowsPerDevice, neighbours));
try {
return future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
ObjectNode node = mapper.createObjectNode();
node.put("Error", e.getMessage());
return node;
}
}
@Override
public void setup(InstallType type, Optional<JsonNode> runParams) {
switch (type) {
case MESH:
......@@ -135,14 +178,14 @@ public class DemoInstaller implements DemoAPI {
break;
case RANDOM:
//check that we do not have a random installer running
if (randomWorker == null || randomWorker.isShutdown()) {
randomWorker = Executors.newFixedThreadPool(1,
if (installWorker == null || installWorker.isShutdown()) {
installWorker = Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("random-worker")
.build());
log.debug("Installing random sequence of intents");
randomInstaller = new RandomInstaller(runParams);
randomWorker.execute(randomInstaller);
installWorker.execute(randomInstaller);
} else {
log.warn("Random installer is already running");
}
......@@ -239,7 +282,7 @@ public class DemoInstaller implements DemoAPI {
@Override
public void run() {
if (!randomWorker.isShutdown()) {
if (!installWorker.isShutdown()) {
randomize();
latch = new CountDownLatch(1);
try {
......@@ -269,12 +312,12 @@ public class DemoInstaller implements DemoAPI {
log.warn("A batch is stuck processing. " +
"pending : {}",
intentBatchService.getPendingOperations());
shutdownAndAwaitTermination(randomWorker);
shutdownAndAwaitTermination(installWorker);
}
}
//if everyting is good proceed.
if (!randomWorker.isShutdown()) {
randomWorker.execute(this);
if (!installWorker.isShutdown()) {
installWorker.execute(this);
}
}
......@@ -432,8 +475,8 @@ public class DemoInstaller implements DemoAPI {
clearExistingIntents();
}
if (randomWorker != null && !randomWorker.isShutdown()) {
shutdownAndAwaitTermination(randomWorker);
if (installWorker != null && !installWorker.isShutdown()) {
shutdownAndAwaitTermination(installWorker);
randomInstaller.shutdown();
}
}
......@@ -470,6 +513,91 @@ public class DemoInstaller implements DemoAPI {
}
}
private class FlowTest implements Callable<JsonNode> {
private final int flowPerDevice;
private final int neighbours;
private FlowRuleOperations.Builder adds;
private FlowRuleOperations.Builder removes;
public FlowTest(int flowsPerDevice, int neighbours) {
this.flowPerDevice = flowsPerDevice;
this.neighbours = neighbours;
prepareInstallation();
}
private void prepareInstallation() {
Set<ControllerNode> instances = Sets.newHashSet(clusterService.getNodes());
instances.remove(clusterService.getLocalNode());
Set<NodeId> acceptableNodes = Sets.newHashSet();
if (neighbours >= instances.size()) {
instances.forEach(instance -> acceptableNodes.add(instance.id()));
} else {
Iterator<ControllerNode> nodes = instances.iterator();
for (int i = neighbours; i > 0; i--) {
acceptableNodes.add(nodes.next().id());
}
}
acceptableNodes.add(clusterService.getLocalNode().id());
Set<Device> devices = Sets.newHashSet();
for (Device dev : deviceService.getDevices()) {
if (acceptableNodes.contains(
mastershipService.getMasterFor(dev.id()))) {
devices.add(dev);
}
}
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(PortNumber.portNumber(RandomUtils.nextInt())).build();
TrafficSelector.Builder sbuilder;
FlowRuleOperations.Builder rules = FlowRuleOperations.builder();
FlowRuleOperations.Builder remove = FlowRuleOperations.builder();
for (Device d : devices) {
for (int i = 0; i < this.flowPerDevice; i++) {
sbuilder = DefaultTrafficSelector.builder();
sbuilder.matchEthSrc(MacAddress.valueOf(RandomUtils.nextInt() * i))
.matchEthDst(MacAddress.valueOf((Integer.MAX_VALUE - i) * RandomUtils.nextInt()));
int randomPriority = RandomUtils.nextInt();
DefaultFlowRule f = new DefaultFlowRule(d.id(), sbuilder.build(), treatment,
randomPriority, appId, 10, false);
rules.add(f);
remove.remove(f);
}
}
this.adds = rules;
this.removes = remove;
}
@Override
public JsonNode call() throws Exception {
ObjectNode node = mapper.createObjectNode();
CountDownLatch latch = new CountDownLatch(1);
flowService.apply(adds.build(new FlowRuleOperationsContext() {
private final Stopwatch timer = Stopwatch.createStarted();
@Override
public void onSuccess(FlowRuleOperations ops) {
long elapsed = timer.elapsed(TimeUnit.MILLISECONDS);
node.put("elapsed", elapsed);
latch.countDown();
}
}));
latch.await(10, TimeUnit.SECONDS);
flowService.apply(removes.build());
return node;
}
}
}
......
......@@ -37,6 +37,18 @@ import java.util.Optional;
public class DemoResource extends BaseResource {
@POST
@Path("flowTest")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response flowTest(InputStream input) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode cfg = mapper.readTree(input);
DemoAPI demo = get(DemoAPI.class);
return Response.ok(demo.flowTest(Optional.ofNullable(cfg)).toString()).build();
}
@POST
@Path("setup")
@Consumes(MediaType.APPLICATION_JSON)
......