Brian O'Connor
Committed by Gerrit Code Review

IntentPerfInstaller: using feedback to determine submit size

Change-Id: Iaa4eb657ee0e22d008597c40561ea89105a09a15
......@@ -61,22 +61,22 @@ import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.intent.IntentEvent.Type.INSTALLED;
import static org.onosproject.net.intent.IntentEvent.Type.WITHDRAWN;
import static org.onosproject.net.intent.IntentEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Application to set up demos.
* Application to test sustained intent throughput.
*/
@Component(immediate = true)
public class IntentPerfInstaller {
//FIXME make this configurable
private static final int NUM_WORKERS = 1;
private static final int NUM_KEYS = 10_000;
private static final int NUM_KEYS = 20_000;
public static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
private static final int GOAL_CYCLE_PERIOD = 1_000; //ms
private final Logger log = getLogger(getClass());
......@@ -99,6 +99,7 @@ public class IntentPerfInstaller {
private Timer reportTimer;
// FIXME this variable isn't shared properly between multiple worker threads
private int lastKey = 0;
@Activate
......@@ -135,6 +136,7 @@ public class IntentPerfInstaller {
reportTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
//adjustRates(); // FIXME we currently adjust rates in the cycle thread
listener.report();
}
}, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
......@@ -158,16 +160,10 @@ public class IntentPerfInstaller {
try {
workers.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker.");
log.warn("Failed to stop worker", e);
}
}
private Iterable<Intent> subset(Set<Intent> intents) {
List<Intent> subset = Lists.newArrayList(intents);
Collections.shuffle(subset);
return subset.subList(0, subset.size() / 2);
}
/**
* Creates a specified number of intents for testing purposes.
*
......@@ -200,7 +196,7 @@ public class IntentPerfInstaller {
continue;
}
//FIXME
//FIXME we currently ignore the path length and always use the same device
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
......@@ -216,7 +212,7 @@ public class IntentPerfInstaller {
count++;
lastKey = k;
if (lastKey % 1000 == 0) {
log.info("Building intents... {} ({})", count, lastKey);
log.info("Building intents... {} (attempt: {})", lastKey, count);
}
}
log.info("Created {} intents", numberOfKeys);
......@@ -226,24 +222,33 @@ public class IntentPerfInstaller {
// Submits intent operations.
final class Submitter implements Runnable {
private long lastDuration;
private int lastCount;
private Set<Intent> intents = Sets.newHashSet();
private Set<Intent> submitted = Sets.newHashSet();
private Set<Intent> withdrawn = Sets.newHashSet();
private Submitter(Set<Intent> intents) {
this.intents = intents;
lastCount = NUM_KEYS / 4;
lastDuration = 1000; // 1 second
}
@Override
public void run() {
delay(2000); // take a breath to start
prime();
while (!stopped) {
cycle();
delay(800); // take a breath
}
}
private Iterable<Intent> subset(Set<Intent> intents) {
List<Intent> subset = Lists.newArrayList(intents);
Collections.shuffle(subset);
return subset.subList(0, lastCount);
}
// Submits the specified intent.
private void submit(Intent intent) {
intentService.submit(intent);
......@@ -273,23 +278,52 @@ public class IntentPerfInstaller {
// Runs a single operation cycle.
private void cycle() {
//TODO consider running without rate adjustment
adjustRates();
long start = currentTimeMillis();
subset(submitted).forEach(this::withdraw);
subset(withdrawn).forEach(this::submit);
long delta = currentTimeMillis() - start;
if (delta > 5000 || delta < 0) {
if (delta > GOAL_CYCLE_PERIOD * 3 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
int difference = GOAL_CYCLE_PERIOD - (int) delta;
if (difference > 0) {
delay(difference);
}
lastDuration = delta;
}
}
int cycleCount = 0;
private void adjustRates() {
//FIXME need to iron out the rate adjustment
if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
if (listener.requestThroughput() - listener.processedThroughput() <= 500 &&
lastDuration <= GOAL_CYCLE_PERIOD) {
lastCount = Math.min(lastCount + 100, intents.size() / 2);
} else {
lastCount *= 0.8;
}
log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
}
}
}
// Event listener to monitor throughput.
final class Listener implements IntentListener {
private final Map<IntentEvent.Type, Counter> counters;
private Map<IntentEvent.Type, Counter> counters;
private final Counter runningTotal = new Counter();
private volatile double processedThroughput = 0;
private volatile double requestThroughput = 0;
public Listener() {
counters = initCounters();
}
......@@ -302,6 +336,14 @@ public class IntentPerfInstaller {
return map;
}
public double processedThroughput() {
return processedThroughput;
}
public double requestThroughput() {
return requestThroughput;
}
@Override
public void event(IntentEvent event) {
if (event.subject().appId().equals(appId)) {
......@@ -310,25 +352,29 @@ public class IntentPerfInstaller {
}
public void report() {
StringBuilder stringBuilder = new StringBuilder();
Counter installed = counters.get(INSTALLED);
Counter withdrawn = counters.get(WITHDRAWN);
double current = installed.throughput() + withdrawn.throughput();
Map<IntentEvent.Type, Counter> reportCounters = counters;
counters = initCounters();
// update running total and latest throughput
Counter installed = reportCounters.get(INSTALLED);
Counter withdrawn = reportCounters.get(WITHDRAWN);
processedThroughput = installed.throughput() + withdrawn.throughput();
runningTotal.add(installed.total() + withdrawn.total());
Counter installReq = reportCounters.get(INSTALL_REQ);
Counter withdrawReq = reportCounters.get(WITHDRAW_REQ);
requestThroughput = installReq.throughput() + withdrawReq.throughput();
// build the string to report
StringBuilder stringBuilder = new StringBuilder();
for (IntentEvent.Type type : IntentEvent.Type.values()) {
stringBuilder.append(printCounter(type)).append("; ");
Counter counter = reportCounters.get(type);
stringBuilder.append(format("%s=%.2f;", type, counter.throughput()));
}
log.info("Throughput: OVERALL={}; CURRENT={}; {}",
format("%.2f", runningTotal.throughput()),
format("%.2f", current), stringBuilder);
}
private String printCounter(IntentEvent.Type event) {
Counter counter = counters.get(event);
String result = format("%s=%.2f", event, counter.throughput());
counter.reset();
return result;
format("%.2f", processedThroughput),
stringBuilder);
}
}
}
......