Thomas Vachuska
Committed by Gerrit Code Review

Made intent perf app multi-threaded; doesn't seem to help.

Made Jono's changes to ECM per Madan's suggesion.
Added cell beast.
Re-enabled anti-entropy.
Added ability to push bits through test proxy for faster upload.

Change-Id: I1455d6d443a697d7a3973c88cb81bfdac0e1dd7f
......@@ -44,6 +44,7 @@ import org.onosproject.net.intent.PointToPointIntent;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -56,6 +57,7 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static org.apache.felix.scr.annotations.ReferenceCardinality.MANDATORY_UNARY;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.groupedThreads;
......@@ -69,6 +71,13 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
public class IntentPerfInstaller {
//FIXME make this configurable
private static final int NUM_WORKERS = 1;
private static final int NUM_KEYS = 10_000;
public static final int START_DELAY = 5_000; // ms
private static final int REPORT_PERIOD = 5_000; //ms
private final Logger log = getLogger(getClass());
@Reference(cardinality = MANDATORY_UNARY)
......@@ -83,32 +92,31 @@ public class IntentPerfInstaller {
@Reference(cardinality = MANDATORY_UNARY)
protected DeviceService deviceService;
private ExecutorService worker;
private ExecutorService workers;
private ApplicationId appId;
private Listener listener;
private Set<Intent> intents;
private Set<Intent> submitted;
private Set<Intent> withdrawn;
private boolean stopped;
private static final long REPORT_PERIOD = 5000L; //ms
private Timer reportTimer;
//FIXME make this configurable
private static final int NUM_KEYS = 10_000;
private int lastKey = 0;
@Activate
public void activate() {
String nodeId = clusterService.getLocalNode().ip().toString();
appId = coreService.registerApplication("org.onosproject.intentperf."
+ nodeId);
intents = Sets.newHashSet();
submitted = Sets.newHashSet();
withdrawn = Sets.newHashSet();
appId = coreService.registerApplication("org.onosproject.intentperf." + nodeId);
worker = Executors.newFixedThreadPool(1, groupedThreads("onos/intent-perf", "worker"));
reportTimer = new Timer("onos-intent-perf-reporter");
workers = Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/intent-perf", "worker-%d"));
log.info("Started with Application ID {}", appId.id());
start(); //FIXME
// Schedule delayed start
reportTimer.schedule(new TimerTask() {
@Override
public void run() {
start();
}
}, START_DELAY);
}
@Deactivate
......@@ -123,26 +131,20 @@ public class IntentPerfInstaller {
listener = new Listener();
intentService.addListener(listener);
long delay = System.currentTimeMillis() % REPORT_PERIOD;
reportTimer = new Timer("onos-intent-perf-reporter");
// Schedule reporter task on report period boundary
reportTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
listener.report();
}
}, delay, REPORT_PERIOD);
}, REPORT_PERIOD - currentTimeMillis() % REPORT_PERIOD, REPORT_PERIOD);
// Submit workers
stopped = false;
worker.submit(() -> {
delay(2000); // take a breath to start
createIntents(NUM_KEYS, 2); //FIXME
prime();
while (!stopped) {
cycle();
delay(800); // take a breath
}
});
Set<Device> devices = new HashSet<>();
for (int i = 0; i < NUM_WORKERS; i++) {
workers.submit(new Submitter(createIntents(NUM_KEYS, 2, lastKey, devices)));
}
}
public void stop() {
......@@ -154,63 +156,53 @@ public class IntentPerfInstaller {
}
stopped = true;
try {
worker.awaitTermination(5, TimeUnit.SECONDS);
workers.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to stop worker.");
}
}
private void cycle() {
long start = System.currentTimeMillis();
subset(submitted).forEach(this::withdraw);
subset(withdrawn).forEach(this::submit);
long delta = System.currentTimeMillis() - start;
if (delta > 1000 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
}
private Iterable<Intent> subset(Set<Intent> intents) {
List<Intent> subset = Lists.newArrayList(intents);
Collections.shuffle(subset);
return subset.subList(0, subset.size() / 2);
}
private void submit(Intent intent) {
intentService.submit(intent);
submitted.add(intent);
withdrawn.remove(intent); //TODO could check result here...
}
private void withdraw(Intent intent) {
intentService.withdraw(intent);
withdrawn.add(intent);
submitted.remove(intent); //TODO could check result here...
}
private void createIntents(int numberOfKeys, int pathLength) {
/**
* Creates a specified number of intents for testing purposes.
*
* @param numberOfKeys number of intents
* @param pathLength path depth
* @param firstKey first key to attempt
* @param devices set of previously utilized devices @return set of intents
*/
private Set<Intent> createIntents(int numberOfKeys, int pathLength,
int firstKey, Set<Device> devices) {
Iterator<Device> deviceItr = deviceService.getAvailableDevices().iterator();
Set<Intent> result = new HashSet<>();
Device ingressDevice = null;
while (deviceItr.hasNext()) {
Device device = deviceItr.next();
if (deviceService.getRole(device.id()) == MastershipRole.MASTER) {
if (deviceService.getRole(device.id()) == MastershipRole.MASTER &&
!devices.contains(device)) {
ingressDevice = device;
devices.add(device);
break;
}
}
checkState(ingressDevice != null, "There are no local devices");
for (int local = 0, i = 0; local < numberOfKeys; i++) {
Key key = Key.of(i, appId);
for (int count = 0, k = firstKey; count < numberOfKeys; k++) {
Key key = Key.of(k, appId);
if (!intentService.isLocal(key)) {
// Bail if the key is not local
continue;
}
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
//FIXME
TrafficSelector selector = DefaultTrafficSelector.builder().build();
TrafficTreatment treatment = DefaultTrafficTreatment.builder().build();
ConnectPoint ingress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(1));
ConnectPoint egress = new ConnectPoint(ingressDevice.id(), PortNumber.portNumber(2));
......@@ -218,28 +210,82 @@ public class IntentPerfInstaller {
selector, treatment,
ingress, egress,
Collections.emptyList());
intents.add(intent);
local++;
if (i % 1000 == 0) {
log.info("Building intents... {} ({})", local, i);
result.add(intent);
// Bump up the counter and remember this as the last key used.
count++;
lastKey = k;
if (lastKey % 1000 == 0) {
log.info("Building intents... {} ({})", count, lastKey);
}
}
log.info("Created {} intents", numberOfKeys);
return result;
}
private void prime() {
int i = 0;
withdrawn.addAll(intents);
for (Intent intent : intents) {
submit(intent);
// only submit half of the intents to start
if (i++ >= intents.size() / 2) {
break;
// Submits intent operations.
final class Submitter implements Runnable {
private Set<Intent> intents = Sets.newHashSet();
private Set<Intent> submitted = Sets.newHashSet();
private Set<Intent> withdrawn = Sets.newHashSet();
private Submitter(Set<Intent> intents) {
this.intents = intents;
}
@Override
public void run() {
delay(2000); // take a breath to start
prime();
while (!stopped) {
cycle();
delay(800); // take a breath
}
}
// Submits the specified intent.
private void submit(Intent intent) {
intentService.submit(intent);
submitted.add(intent);
withdrawn.remove(intent); //TODO could check result here...
}
// Withdraws the specified intent.
private void withdraw(Intent intent) {
intentService.withdraw(intent);
withdrawn.add(intent);
submitted.remove(intent); //TODO could check result here...
}
// Primes the cycle.
private void prime() {
int i = 0;
withdrawn.addAll(intents);
for (Intent intent : intents) {
submit(intent);
// only submit half of the intents to start
if (i++ >= intents.size() / 2) {
break;
}
}
}
// Runs a single operation cycle.
private void cycle() {
long start = currentTimeMillis();
subset(submitted).forEach(this::withdraw);
subset(withdrawn).forEach(this::submit);
long delta = currentTimeMillis() - start;
if (delta > 5000 || delta < 0) {
log.warn("Cycle took {} ms", delta);
}
}
}
class Listener implements IntentListener {
// Event listener to monitor throughput.
final class Listener implements IntentListener {
private final Map<IntentEvent.Type, Counter> counters;
private final Counter runningTotal = new Counter();
......@@ -284,4 +330,5 @@ public class IntentPerfInstaller {
return result;
}
}
}
......
......@@ -38,7 +38,7 @@ public class IntentAccumulator extends AbstractAccumulator<IntentData> {
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer("intent-op-batching");
private static final Timer TIMER = new Timer("onos-intent-op-batching");
private final IntentBatchDelegate delegate;
......
......@@ -65,14 +65,14 @@ import static org.slf4j.LoggerFactory.getLogger;
public class DefaultTopologyProvider extends AbstractProvider
implements TopologyProvider {
private static final int MAX_THREADS = 32;
private static final int MAX_THREADS = 8;
private static final int DEFAULT_MAX_EVENTS = 1000;
private static final int DEFAULT_MAX_IDLE_MS = 10;
private static final int DEFAULT_MAX_BATCH_MS = 50;
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer("topo-event-batching");
private static final Timer TIMER = new Timer("onos-topo-event-batching");
@Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
label = "Maximum number of events to accumulate")
......
......@@ -46,6 +46,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -66,7 +67,6 @@ public class EventuallyConsistentMapImpl<K, V>
private final Map<K, Timestamped<V>> items;
private final Map<K, Timestamp> removedItems;
private final String mapName;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
......@@ -88,6 +88,7 @@ public class EventuallyConsistentMapImpl<K, V>
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " map is already destroyed";
private final String destroyedMessage;
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
......@@ -98,18 +99,20 @@ public class EventuallyConsistentMapImpl<K, V>
/**
* Creates a new eventually consistent map shared amongst multiple instances.
*
* <p>
* Each map is identified by a string map name. EventuallyConsistentMapImpl
* objects in different JVMs that use the same map name will form a
* distributed map across JVMs (provided the cluster service is aware of
* both nodes).
*
* </p>
* <p>
* The client is expected to provide an
* {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
* will be stored in this map have been registered (including referenced
* classes). This serializer will be used to serialize both K and V for
* inter-node notifications.
*
* </p>
* <p>
* The client must provide an {@link org.onosproject.store.impl.ClockService}
* which can generate timestamps for a given key. The clock service is free
* to generate timestamps however it wishes, however these timestamps will
......@@ -117,6 +120,7 @@ public class EventuallyConsistentMapImpl<K, V>
* to ensure updates are properly ordered for the use case (i.e. in some
* cases wallclock time will suffice, whereas in other cases logical time
* will be necessary).
* </p>
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
......@@ -131,12 +135,11 @@ public class EventuallyConsistentMapImpl<K, V>
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K, V> clockService) {
this.mapName = checkNotNull(mapName);
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
serializer = createSerializer(checkNotNull(serializerBuilder));
destroyedMessage = mapName + ERROR_DESTROYED;
this.clockService = checkNotNull(clockService);
......@@ -153,10 +156,9 @@ public class EventuallyConsistentMapImpl<K, V>
groupedThreads("onos/ecm", mapName + "-bg-%d")));
// start anti-entropy thread
//FIXME need to re-enable
// backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
// initialDelaySec, periodSec,
// TimeUnit.SECONDS);
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec,
TimeUnit.SECONDS);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
......@@ -191,6 +193,7 @@ public class EventuallyConsistentMapImpl<K, V>
/**
* Sets the executor to use for broadcasting messages and returns this
* instance for method chaining.
*
* @param executor executor service
* @return this instance
*/
......@@ -202,26 +205,26 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public int size() {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
return items.size();
}
@Override
public boolean isEmpty() {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
return items.isEmpty();
}
@Override
public boolean containsKey(K key) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
return items.containsKey(key);
}
@Override
public boolean containsValue(V value) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
checkNotNull(value, ERROR_NULL_VALUE);
return items.values().stream()
......@@ -230,7 +233,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public V get(K key) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
Timestamped<V> value = items.get(key);
......@@ -242,7 +245,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void put(K key, V value) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
......@@ -284,7 +287,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void remove(K key) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
// TODO prevent calls here if value is important for timestamp
......@@ -321,7 +324,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void remove(K key, V value) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
......@@ -338,7 +341,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
......@@ -362,8 +365,8 @@ public class EventuallyConsistentMapImpl<K, V>
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, entry.key(),
entry.value());
EventuallyConsistentMapEvent.Type.PUT, entry.key(),
entry.value());
notifyListeners(externalEvent);
}
}
......@@ -371,7 +374,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void clear() {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
......@@ -399,14 +402,14 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public Set<K> keySet() {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
return items.keySet();
}
@Override
public Collection<V> values() {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
return items.values().stream()
.map(Timestamped::value)
......@@ -415,7 +418,7 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
return items.entrySet().stream()
.map(e -> Pair.of(e.getKey(), e.getValue().value()))
......@@ -424,14 +427,14 @@ public class EventuallyConsistentMapImpl<K, V>
@Override
public void addListener(EventuallyConsistentMapListener<K, V> listener) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
listeners.add(checkNotNull(listener));
}
@Override
public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
checkState(!destroyed, mapName + ERROR_DESTROYED);
checkState(!destroyed, destroyedMessage);
listeners.remove(checkNotNull(listener));
}
......@@ -502,7 +505,7 @@ public class EventuallyConsistentMapImpl<K, V>
Set<ControllerNode> nodes = clusterService.getNodes();
List<NodeId> nodeIds = nodes.stream()
.map(node -> node.id())
.map(ControllerNode::id)
.collect(Collectors.toList());
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
......@@ -689,7 +692,7 @@ public class EventuallyConsistentMapImpl<K, V>
*/
// Guarded by synchronized (this)
private List<EventuallyConsistentMapEvent<K, V>>
antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents
= new LinkedList<>();
......@@ -752,8 +755,8 @@ public class EventuallyConsistentMapImpl<K, V>
if (putInternal(key, value, timestamp)) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key,
value);
EventuallyConsistentMapEvent.Type.PUT, key,
value);
notifyListeners(externalEvent);
}
}
......
......@@ -4,7 +4,8 @@
# -----------------------------------------------------------------------------
#export JAVA_HOME=${JAVA_HOME:-/usr/lib/jvm/java-7-openjdk-amd64/}
export JAVA_OPTS="${JAVA_OPTS:--Xms256m -Xmx2g}"
export JAVA_OPTS="${JAVA_OPTS:--Xms512m -Xmx2G}"
# export JAVA_OPTS="${JAVA_OPTS:--Xms2G -Xmx8G}" # -XX:+UseConcMarkSweepGC -XX:+CMSIncrementalMode -XX:+PrintGCDetails -XX:+PrintGCTimeStamps}"
ONOS_HOME=/opt/onos
......
#!/bin/bash
# -----------------------------------------------------------------------------
# Remotely pushes bits to all remote nodes in preparation for install.
# -----------------------------------------------------------------------------
[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
. $ONOS_ROOT/tools/build/envDefaults
node=${1:-$OCT}
remote=$ONOS_USER@$node
shift
echo "Pushing to proxy $node..."
onos-push-bits $node
others=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
for other in $others; do
echo "Pushing to $other..."
ssh $remote "scp $ONOS_TAR $ONOS_USER@$other:$ONOS_TAR"
done
# Bare metal cluster
# Use the 10G NIC for cluster communications
export ONOS_NIC="192.168.200.*"
# ONOS Test proxy
export OCT=10.254.1.200
# Use the 1G NICs for external access
export OC1=10.254.1.201
export OC2=10.254.1.202
export OC3=10.254.1.203
export OC4=10.254.1.204
export OC5=10.254.1.205
export OC6=10.254.1.206
export OC7=10.254.1.207
export OCI=${OC1}
export ONOS_FEATURES=webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-null
# Bare metal cluster
# Use the 10G NIC for cluster communications
export ONOS_NIC="192.168.200.*"
# ONOS Test proxy
export OCT=10.254.1.200
# Use the 1G NICs for external access
export OC1=10.254.1.201
export OC2=10.254.1.202
export OC3=10.254.1.203
export OCI=${OC1}
export ONOS_FEATURES=webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-null
......@@ -7,5 +7,6 @@ export OC3="10.128.11.3"
export OCN="10.128.11.4"
export OCI="${OC1}"
export OCT="${OC1}"
export ONOS_FEATURES="webconsole,onos-api,onos-core,onos-cli,onos-openflow,onos-gui,onos-rest,onos-app-fwd,onos-app-proxyarp"
export ONOS_FEATURES="webconsole,onos-api,onos-core,onos-cli,onos-rest,onos-null"
......