Brian O'Connor

Pulling PartitionService into API and making IntentPerfInstaller configurable

Change-Id: I9fde28986b6714c0ca4d635d5a3699094e2f0081
......@@ -15,21 +15,28 @@
*/
package org.onosproject.intentperf;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.onlab.packet.MacAddress;
import org.onlab.util.Counter;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
......@@ -41,12 +48,12 @@ import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentListener;
import org.onosproject.net.intent.IntentService;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.net.intent.PointToPointIntent;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -55,6 +62,7 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
......@@ -71,15 +79,36 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
public class IntentPerfInstaller {
//FIXME make this configurable
private static final int NUM_WORKERS = 1;
private static final int NUM_KEYS = 40_000;
private final Logger log = getLogger(getClass());
private static final int DEFAULT_NUM_WORKERS = 1;
private static final int DEFAULT_NUM_KEYS = 40_000;
private static final int DEFAULT_GOAL_CYCLE_PERIOD = 1_000; //ms
private static final int DEFAULT_NUM_NEIGHBORS = 0;
public static final int START_DELAY = 5_000; // ms
private static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
private static final int GOAL_CYCLE_PERIOD = 1_000; //ms
private final Logger log = getLogger(getClass());
//FIXME add path length
@Property(name = "numKeys", intValue = DEFAULT_NUM_KEYS,
label = "Number of keys (i.e. unique intents) to generate per instance")
private int numKeys = DEFAULT_NUM_KEYS;
//TODO implement numWorkers property
// @Property(name = "numThreads", intValue = DEFAULT_NUM_WORKERS,
// label = "Number of installer threads per instance")
// private int numWokers = DEFAULT_NUM_WORKERS;
@Property(name = "cyclePeriod", intValue = DEFAULT_GOAL_CYCLE_PERIOD,
label = "Goal for cycle period (in ms)")
private int cyclePeriod = DEFAULT_GOAL_CYCLE_PERIOD;
@Property(name = "numNeighbors", intValue = DEFAULT_NUM_NEIGHBORS,
label = "Number of neighbors to generate intents for")
private int numNeighbors = DEFAULT_NUM_NEIGHBORS;
@Reference(cardinality = MANDATORY_UNARY)
protected CoreService coreService;
......@@ -93,6 +122,15 @@ public class IntentPerfInstaller {
@Reference(cardinality = MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = MANDATORY_UNARY)
protected PartitionService partitionService;
@Reference(cardinality = MANDATORY_UNARY)
protected ComponentConfigService configService;
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
......@@ -105,12 +143,20 @@ public class IntentPerfInstaller {
@Activate
public void activate() {
configService.registerProperties(getClass());
String nodeId = clusterService.getLocalNode().ip().toString();
appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
reportTimer = new Timer("onos-intent-perf-reporter");
workers = Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
log.info("Started with Application ID {}", appId.id());
workers = Executors.newFixedThreadPool(DEFAULT_NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
// disable flow backups for testing
log.info("flow props: {}",
configService.getProperties("org.onosproject.store.flow.impl.DistributedFlowRuleStore"));
configService.setProperty("org.onosproject.store.flow.impl.DistributedFlowRuleStore",
"backupEnabled", "false");
// Schedule delayed start
reportTimer.schedule(new TimerTask() {
......@@ -123,11 +169,22 @@ public class IntentPerfInstaller {
@Deactivate
public void deactivate() {
configService.unregisterProperties(getClass(), false);
stop();
log.info("Stopped");
}
//FIXME add modified
private void logConfig(String prefix) {
log.info("{} with appId {}; numKeys = {}; cyclePeriod = {} ms; numNeighbors={}",
prefix, appId.id(), numKeys, cyclePeriod, numNeighbors);
}
public void start() {
// TODO perhaps move to start(), but need to call before logConfig
// adjust numNeighbors and generate list of neighbors
numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
// perhaps we want to prime before listening...
// we will need to discard the first few results for priming and warmup
listener = new Listener();
......@@ -144,85 +201,119 @@ public class IntentPerfInstaller {
// Submit workers
stopped = false;
Set<Device> devices = new HashSet<>();
for (int i = 0; i < NUM_WORKERS; i++) {
workers.submit(new Submitter(createIntents(NUM_KEYS, 2, lastKey, devices)));
for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
workers.submit(new Submitter(createIntents(numKeys, /*FIXME*/ 2, lastKey)));
}
logConfig("Started");
}
public void stop() {
stopped = true;
if (listener != null) {
reportTimer.cancel();
intentService.removeListener(listener);
listener = null;
reportTimer = null;
}
stopped = true;
try {
workers.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker", e);
}
log.info("Stopped");
}
/**
* Creates a specified number of intents for testing purposes.
*
* @param numberOfKeys number of intents
* @param pathLength path depth
* @param firstKey first key to attempt
* @param devices set of previously utilized devices @return set of intents
*/
private Set<Intent> createIntents(int numberOfKeys, int pathLength,
int firstKey, Set<Device> devices) {
Iterator<Device> deviceItr = deviceService.getAvailableDevices().iterator();
Set<Intent> result = new HashSet<>();
Device ingressDevice = null;
while (deviceItr.hasNext()) {
Device device = deviceItr.next();
if (deviceService.getRole(device.id()) == MastershipRole.MASTER &&
!devices.contains(device)) {
ingressDevice = device;
devices.add(device);
break;
private List<NodeId> getNeighbors() {
List<NodeId> nodes = clusterService.getNodes().stream()
.map(ControllerNode::id)
.collect(Collectors.toCollection(ArrayList::new));
// sort neighbors by id
Collections.sort(nodes, (node1, node2) ->
node1.toString().compareTo(node2.toString()));
// rotate the local node to index 0
Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
log.info("neighbors (raw): {}", nodes); //TODO remove
// generate the sub-list that will contain local node and selected neighbors
nodes = nodes.subList(0, numNeighbors + 1);
log.info("neighbors: {}", nodes); //TODO remove
return nodes;
}
}
checkState(ingressDevice != null, "There are no local devices");
// prefix based on node id
long prefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Key key = Key.of(prefix + k, appId);
//FIXME comment this out for spread of keys
if (!intentService.isLocal(key)) {
// Bail if the key is not local
continue;
}
private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
// choose a random device for which this node is master
List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
Device device = deviceList.get(RandomUtils.nextInt(deviceList.size()));
//FIXME we currently ignore the path length and always use the same device
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthDst(MacAddress.valueOf(count)).build();
.matchEthDst(MacAddress.valueOf(mac)).build();
TrafficTreatment treatment = DefaultTrafficTreatment.emptyTreatment();
ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
ConnectPoint egress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(2));
ConnectPoint ingress = new ConnectPoint(device.id(), PortNumber.portNumber(1));
ConnectPoint egress = new ConnectPoint(device.id(), PortNumber.portNumber(2));
Intent intent = new PointToPointIntent(appId, key,
return new PointToPointIntent(appId, key,
selector, treatment,
ingress, egress,
Collections.emptyList(),
Intent.DEFAULT_INTENT_PRIORITY);
result.add(intent);
}
/**
* Creates a specified number of intents for testing purposes.
*
* @param numberOfKeys number of intents
* @param pathLength path depth
* @param firstKey first key to attempt
* @return set of intents
*/
private Set<Intent> createIntents(int numberOfKeys, int pathLength, int firstKey) {
//Set<Intent> result = new HashSet<>();
List<NodeId> neighbors = getNeighbors();
Multimap<NodeId, Device> devices = ArrayListMultimap.create();
deviceService.getAvailableDevices().forEach(device ->
devices.put(mastershipService.getMasterFor(device.id()), device));
// ensure that we have at least one device per neighbor
neighbors.forEach(node ->
checkState(devices.get(node).size() > 0,
"There are no devices for {}", node));
// TODO pull this outside so that createIntent can use it
// prefix based on node id for keys generated on this instance
long keyPrefix = ((long) clusterService.getLocalNode().ip().getIp4Address().toInt()) << 32;
int maxKeysPerNode = (int) Math.ceil((double) numberOfKeys / neighbors.size());
Multimap<NodeId, Intent> intents = ArrayListMultimap.create();
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Key key = Key.of(keyPrefix + k, appId);
NodeId leader = partitionService.getLeader(key);
if (!neighbors.contains(leader) || intents.get(leader).size() >= maxKeysPerNode) {
// Bail if we are not sending to this node or we have enough for this node
continue;
}
intents.put(leader, createIntent(key, keyPrefix + k, leader, devices));
// Bump up the counter and remember this as the last key used.
count++;
lastKey = k;
if (lastKey % 1000 == 0) {
log.info("Building intents... {} (attempt: {})", lastKey, count);
if (count % 1000 == 0) {
log.info("Building intents... {} (attempt: {})", count, lastKey);
}
}
checkState(intents.values().size() == numberOfKeys,
"Generated wrong number of intents");
log.info("Created {} intents", numberOfKeys);
return result;
//FIXME remove this
intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
return Sets.newHashSet(intents.values());
}
// Submits intent operations.
......@@ -237,8 +328,8 @@ public class IntentPerfInstaller {
private Submitter(Set<Intent> intents) {
this.intents = intents;
lastCount = NUM_KEYS / 4;
lastDuration = 1000; // 1 second
lastCount = numKeys / 4;
lastDuration = 1_000; // 1 second
}
@Override
......@@ -247,6 +338,7 @@ public class IntentPerfInstaller {
while (!stopped) {
cycle();
}
clear();
}
private Iterable<Intent> subset(Set<Intent> intents) {
......@@ -282,6 +374,10 @@ public class IntentPerfInstaller {
}
}
private void clear() {
submitted.forEach(this::withdraw);
}
// Runs a single operation cycle.
private void cycle() {
//TODO consider running without rate adjustment
......@@ -292,11 +388,11 @@ public class IntentPerfInstaller {
subset(withdrawn).forEach(this::submit);
long delta = currentTimeMillis() - start;
if (delta > GOAL_CYCLE_PERIOD * 3 || delta < 0) {
if (delta > cyclePeriod * 3 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
int difference = GOAL_CYCLE_PERIOD - (int) delta;
int difference = cyclePeriod - (int) delta;
if (difference > 0) {
delay(difference);
}
......@@ -307,9 +403,10 @@ public class IntentPerfInstaller {
int cycleCount = 0;
private void adjustRates() {
//FIXME need to iron out the rate adjustment
//FIXME we should taper the adjustments over time
if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
lastDuration <= GOAL_CYCLE_PERIOD) {
lastDuration <= cyclePeriod) {
lastCount = Math.min(lastCount + 1000, intents.size() / 2);
} else {
lastCount *= 0.8;
......@@ -324,8 +421,8 @@ public class IntentPerfInstaller {
// Event listener to monitor throughput.
final class Listener implements IntentListener {
private Map<IntentEvent.Type, Counter> counters;
private final Counter runningTotal = new Counter();
private volatile Map<IntentEvent.Type, Counter> counters;
private volatile double processedThroughput = 0;
private volatile double requestThroughput = 0;
......
......@@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.intent.impl;
package org.onosproject.net.intent;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
/**
* Service for interacting with the partition-to-instance assignments.
......
......@@ -33,6 +33,7 @@ import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMap;
......
......@@ -31,6 +31,7 @@ import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.intent.Key;
import org.onosproject.net.intent.PartitionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......