Thomas Vachuska
Committed by Gerrit Code Review

ONOS-1315 Adding configurability to intent performance test app.

Change-Id: I2782bdc0f78cc49aa60af1fa03eae91a2fd6bce0
......@@ -42,6 +42,10 @@
<artifactId>onos-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -50,7 +50,7 @@ import static org.slf4j.LoggerFactory.getLogger;
@Service(value = IntentPerfCollector.class)
public class IntentPerfCollector {
private static final long SAMPLE_WINDOW = 5_000;
private static final long SAMPLE_TIME_WINDOW_MS = 5_000;
private final Logger log = getLogger(getClass());
private static final int MAX_SAMPLES = 1_000;
......@@ -82,9 +82,9 @@ public class IntentPerfCollector {
@Activate
public void activate() {
this.nodeId = clusterService.getLocalNode().id();
this.newestTime = 0;
nodeId = clusterService.getLocalNode().id();
// TODO: replace with shared executor
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/perf", "message-handler"));
......@@ -98,9 +98,8 @@ public class IntentPerfCollector {
for (int i = 0; i < nodes.length; i++) {
nodeToIndex.put(nodes[i].id(), i);
}
overall = new Sample(0, nodes.length);
current = new Sample(0, nodes.length);
clearSamples();
log.info("Started");
}
......@@ -112,19 +111,26 @@ public class IntentPerfCollector {
}
/**
* Clears all previously accumulated data.
*/
public void clearSamples() {
newestTime = 0;
overall = new Sample(0, nodes.length);
current = new Sample(0, nodes.length);
samples.clear();
}
/**
* Records a sample point of data about intent operation rate.
*
* @param overallRate overall rate
* @param currentRate current rate
*/
public void recordSample(double overallRate, double currentRate) {
try {
long now = System.currentTimeMillis();
addSample(now, nodeId, overallRate, currentRate);
broadcastSample(now, nodeId, overallRate, currentRate);
} catch (Exception e) {
log.error("Boom!", e);
}
long now = System.currentTimeMillis();
addSample(now, nodeId, overallRate, currentRate);
broadcastSample(now, nodeId, overallRate, currentRate);
}
/**
......@@ -173,7 +179,7 @@ public class IntentPerfCollector {
}
private Sample createCurrentSampleIfNeeded(long time) {
Sample oldSample = time - newestTime > SAMPLE_WINDOW || current.isComplete() ? current : null;
Sample oldSample = time - newestTime > SAMPLE_TIME_WINDOW_MS || current.isComplete() ? current : null;
if (oldSample != null) {
newestTime = time;
current = new Sample(time, nodes.length);
......@@ -227,9 +233,9 @@ public class IntentPerfCollector {
@Override
public void handle(ClusterMessage message) {
String[] fields = new String(message.payload()).split("\\|");
log.info("Received sample from {}: {}", message.sender(), fields);
log.debug("Received sample from {}: {}", message.sender(), fields);
addSample(Long.parseLong(fields[0]), message.sender(),
Double.parseDouble(fields[1]), Double.parseDouble(fields[1]));
Double.parseDouble(fields[1]), Double.parseDouble(fields[2]));
}
}
}
......
......@@ -24,8 +24,11 @@ import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
import org.onosproject.cfg.ComponentConfigService;
......@@ -50,10 +53,16 @@ import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -65,11 +74,11 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.*;
import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -77,20 +86,25 @@ import static org.slf4j.LoggerFactory.getLogger;
* Application to test sustained intent throughput.
*/
@Component(immediate = true)
@Service(value = IntentPerfInstaller.class)
public class IntentPerfInstaller {
private final Logger log = getLogger(getClass());
private static final int DEFAULT_NUM_WORKERS = 1;
private static final int DEFAULT_NUM_KEYS = 40_000;
private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
private static final int DEFAULT_NUM_KEYS = 40000;
private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1000; //ms
private static final int DEFAULT_NUM_NEIGHBORS = 0;
private static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
private static final String START = "start";
private static final String STOP = "stop";
private static final MessageSubject CONTROL = new MessageSubject("intent-perf-ctl");
//FIXME add path length
@Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
......@@ -134,6 +148,11 @@ public class IntentPerfInstaller {
@Reference(cardinality = MANDATORY_UNARY)
protected IntentPerfCollector sampleCollector;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService communicationService;
private ExecutorService messageHandlingExecutor;
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
......@@ -145,86 +164,138 @@ public class IntentPerfInstaller {
private int lastKey = 0;
private IntentPerfUi perfUi;
private NodeId nodeId;
private TimerTask reporterTask;
@Activate
public void activate() {
// configService.registerProperties(getClass());
public void activate(ComponentContext context) {
configService.registerProperties(getClass());
String nodeId = clusterService.getLocalNode().ip().toString();
appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
nodeId = clusterService.getLocalNode().id();
appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId.toString());
// TODO: replace with shared timer
reportTimer = new Timer("onos-intent-perf-reporter");
workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
// disable flow backups for testing
log.info("flow props: {}",
configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
"backupEnabled", "false");
// Schedule delayed start
reportTimer.schedule(new TimerTask() {
@Override
public void run() {
start();
}
}, START_DELAY);
// TODO: replace with shared executor
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/perf", "command-handler"));
communicationService.addSubscriber(CONTROL, new InternalControl(),
messageHandlingExecutor);
listener = new Listener();
intentService.addListener(listener);
// TODO: investigate why this seems to be necessary for configs to get picked up on initial activation
modify(context);
}
@Deactivate
public void deactivate() {
// configService.unregisterProperties(getClass(), false);
stop();
stopTestRun();
configService.unregisterProperties(getClass(), false);
messageHandlingExecutor.shutdown();
communicationService.removeSubscriber(CONTROL);
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
}
}
//FIXME add modified
@Modified
public void modify(ComponentContext context) {
if (context == null) {
logConfig("Reconfigured");
return;
}
Dictionary<?, ?> properties = context.getProperties();
int newNumKeys, newCyclePeriod, newNumNeighbors;
try {
String s = get(properties, "numKeys");
newNumKeys = isNullOrEmpty(s) ? numKeys : Integer.parseInt(s.trim());
s = get(properties, "cyclePeriod");
newCyclePeriod = isNullOrEmpty(s) ? cyclePeriod : Integer.parseInt(s.trim());
s = get(properties, "numNeighbors");
newNumNeighbors = isNullOrEmpty(s) ? numNeighbors : Integer.parseInt(s.trim());
} catch (NumberFormatException | ClassCastException e) {
log.warn("Malformed configuration detected; using defaults", e);
newNumKeys = DEFAULT_NUM_KEYS;
newCyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
newNumNeighbors = DEFAULT_NUM_NEIGHBORS;
}
if (newNumKeys != numKeys || newCyclePeriod != cyclePeriod || newNumNeighbors != numNeighbors) {
numKeys = newNumKeys;
cyclePeriod = newCyclePeriod;
numNeighbors = newNumNeighbors;
logConfig("Reconfigured");
}
}
public void start() {
communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, START.getBytes()));
startTestRun();
}
public void stop() {
communicationService.broadcast(new ClusterMessage(nodeId, CONTROL, STOP.getBytes()));
stopTestRun();
}
private void logConfig(String prefix) {
log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
}
public void start() {
private void startTestRun() {
sampleCollector.clearSamples();
// adjust numNeighbors and generate list of neighbors
numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
// perhaps we want to prime before listening...
// we will need to discard the first few results for priming and warmup
listener = new Listener();
intentService.addListener(listener);
// Schedule reporter task on report period boundary
reportTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
//adjustRates(); // FIXME we currently adjust rates in the cycle thread
listener.report();
}
}, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
reporterTask = new ReporterTask();
reportTimer.scheduleAtFixedRate(reporterTask,
REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD,
REPORT_PERIOD);
// Submit workers
stopped = false;
for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
}
logConfig("Started");
log.info("Started test run");
}
public void stop() {
private void stopTestRun() {
stopped = true;
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
if (reporterTask != null) {
reporterTask.cancel();
reporterTask = null;
}
try {
workers.awaitTermination(5, TimeUnit.SECONDS);
workers.awaitTermination(5 * cyclePeriod, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
log.info("Stopped");
log.info("Stopped test run");
}
private List<NodeId> getNeighbors() {
......@@ -491,4 +562,25 @@ public class IntentPerfInstaller {
}
}
private class InternalControl implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
String cmd = new String(message.payload());
log.info("Received command {}", cmd);
if (cmd.equals(START)) {
startTestRun();
} else {
stopTestRun();
}
}
}
private class ReporterTask extends TimerTask {
@Override
public void run() {
//adjustRates(); // FIXME we currently adjust rates in the cycle thread
listener.report();
}
}
}
......
......@@ -29,7 +29,7 @@ import java.util.List;
*/
@Command(scope = "onos", name = "intent-perf",
description = "Displays accumulated performance metrics")
public class IntentPerfCommand extends AbstractShellCommand {
public class IntentPerfListCommand extends AbstractShellCommand {
@Option(name = "-s", aliases = "--summary", description = "Output just summary",
required = false, multiValued = false)
......@@ -49,11 +49,16 @@ public class IntentPerfCommand extends AbstractShellCommand {
List<String> headers = collector.getSampleHeaders();
Sample overall = collector.getOverall();
double total = 0;
print("%12s: %14s", "Node ID", "Overall Rate");
for (int i = 0; i < overall.data.length; i++) {
print("%12s: %12.2f", headers.get(i), overall.data[i]);
total += overall.data[i];
if (overall.data[i] >= 0) {
print("%12s: %14.2f", headers.get(i), overall.data[i]);
total += overall.data[i];
} else {
print("%12s: %14s", headers.get(i), " ");
}
}
print("%12s: %12.2f", "total", total);
print("%12s: %14.2f", "total", total);
}
private void printSamples() {
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.intentperf;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
/**
* Starts intent performance test run.
*/
@Command(scope = "onos", name = "intent-perf-start",
description = "Starts intent performance test run")
public class IntentPerfStartCommand extends AbstractShellCommand {
@Override
protected void execute() {
get(IntentPerfInstaller.class).start();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.intentperf;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
/**
* Stops intent performance test run.
*/
@Command(scope = "onos", name = "intent-perf-stop",
description = "Stops intent performance test run")
public class IntentPerfStopCommand extends AbstractShellCommand {
@Override
protected void execute() {
get(IntentPerfInstaller.class).stop();
}
}
......@@ -16,7 +16,13 @@
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
<command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
<command>
<action class="org.onosproject.intentperf.IntentPerfCommand"/>
<action class="org.onosproject.intentperf.IntentPerfListCommand"/>
</command>
<command>
<action class="org.onosproject.intentperf.IntentPerfStartCommand"/>
</command>
<command>
<action class="org.onosproject.intentperf.IntentPerfStopCommand"/>
</command>
</command-bundle>
</blueprint>
......