Brian O'Connor
Committed by Gerrit Code Review

Updates to GossipIntentStore and IntentPerfInstaller

Change-Id: If350a6276d758222f9b6ea25ab78d055321eecac
......@@ -181,7 +181,6 @@ public class IntentPerfInstaller {
}
public void start() {
// TODO perhaps move to start(), but need to call before logConfig
// adjust numNeighbors and generate list of neighbors
numNeighbors = Math.min(clusterService.getNodes().size() - 1, numNeighbors);
......@@ -232,14 +231,13 @@ public class IntentPerfInstaller {
node1.toString().compareTo(node2.toString()));
// rotate the local node to index 0
Collections.rotate(nodes, -1 * nodes.indexOf(clusterService.getLocalNode().id()));
log.info("neighbors (raw): {}", nodes); //TODO remove
log.debug("neighbors (raw): {}", nodes); //TODO remove
// generate the sub-list that will contain local node and selected neighbors
nodes = nodes.subList(0, numNeighbors + 1);
log.info("neighbors: {}", nodes); //TODO remove
log.debug("neighbors: {}", nodes); //TODO remove
return nodes;
}
private Intent createIntent(Key key, long mac, NodeId node, Multimap<NodeId, Device> devices) {
// choose a random device for which this node is master
List<Device> deviceList = devices.get(node).stream().collect(Collectors.toList());
......@@ -309,8 +307,6 @@ public class IntentPerfInstaller {
checkState(intents.values().size() == numberOfKeys,
"Generated wrong number of intents");
log.info("Created {} intents", numberOfKeys);
//FIXME remove this
intents.keySet().forEach(node -> log.info("\t{}\t{}", node, intents.get(node).size()));
return Sets.newHashSet(intents.values());
......@@ -336,7 +332,11 @@ public class IntentPerfInstaller {
public void run() {
prime();
while (!stopped) {
cycle();
try {
cycle();
} catch (Exception e) {
log.warn("Exception during cycle", e);
}
}
clear();
}
......@@ -402,14 +402,19 @@ public class IntentPerfInstaller {
int cycleCount = 0;
private void adjustRates() {
int addDelta = Math.max(1000 - cycleCount, 10);
double multRatio = Math.min(0.8 + cycleCount * 0.0002, 0.995);
//FIXME need to iron out the rate adjustment
//FIXME we should taper the adjustments over time
//FIXME don't just use the lastDuration, take an average
if (++cycleCount % 5 == 0) { //TODO: maybe use a timer (we should do this every 5-10 sec)
if (listener.requestThroughput() - listener.processedThroughput() <= 2000 && //was 500
lastDuration <= cyclePeriod) {
lastCount = Math.min(lastCount + 1000, intents.size() / 2);
lastCount = Math.min(lastCount + addDelta, intents.size() / 2);
} else {
lastCount *= 0.8;
lastCount *= multRatio;
}
log.info("last count: {}, last duration: {} ms (sub: {} vs inst: {})",
lastCount, lastDuration, listener.requestThroughput(), listener.processedThroughput());
......
......@@ -653,7 +653,6 @@ public class DistributedFlowRuleStore
switch (event.type()) {
case MASTER_CHANGED:
if (local.equals(rInfo.master().orNull())) {
log.info("{} is now the master for {}. Will load flow rules from backup", local, did);
// This node is the new master, populate local structure
// from backup
flowTable.loadFromBackup(did);
......@@ -725,6 +724,7 @@ public class DistributedFlowRuleStore
if (!backupsEnabled) {
return;
}
log.info("We are now the master for {}. Will load flow rules from backup", deviceId);
ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = new ConcurrentHashMap<>();
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.intent.impl;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -45,9 +46,7 @@ import org.onosproject.store.impl.WallClockTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
......@@ -102,7 +101,7 @@ public class GossipIntentStore
pendingMap = new EventuallyConsistentMapImpl<>("intent-pending",
clusterService,
clusterCommunicator,
intentSerializer, // TODO
intentSerializer,
new IntentDataClockManager<>(),
(key, intentData) -> getPeerNodes(key, intentData));
......@@ -254,29 +253,36 @@ public class GossipIntentStore
private Collection<NodeId> getPeerNodes(Key key, IntentData data) {
NodeId master = partitionService.getLeader(key);
NodeId origin = (data != null) ? data.origin() : null;
if (master == null || origin == null) {
log.warn("Intent {} has no home; master = {}, origin = {}",
data.key(), master, origin);
}
NodeId me = clusterService.getLocalNode().id();
boolean isMaster = Objects.equals(master, me);
boolean isOrigin = Objects.equals(origin, me);
if (isMaster && isOrigin) {
return ImmutableList.of(getRandomNode());
return getRandomNode();
} else if (isMaster) {
return origin != null ? ImmutableList.of(origin) : ImmutableList.of(getRandomNode());
return origin != null ? ImmutableList.of(origin) : getRandomNode();
} else if (isOrigin) {
return ImmutableList.of(master);
return master != null ? ImmutableList.of(master) : getRandomNode();
} else {
// FIXME: why are we here? log error?
log.warn("Not master or origin for intent {}", data.key());
return ImmutableList.of(master);
}
}
private NodeId getRandomNode() {
private List<NodeId> getRandomNode() {
NodeId me = clusterService.getLocalNode().id();
List<NodeId> nodes = clusterService.getNodes().stream()
.map(ControllerNode::id)
.collect(Collectors.toCollection(ArrayList::new));
Collections.shuffle(nodes);
// FIXME check if self
// FIXME verify nodes.size() > 0
return nodes.get(0);
.map(ControllerNode::id)
.filter(node -> !Objects.equals(node, me))
.collect(Collectors.toList());
if (nodes.size() == 0) {
return null;
}
return ImmutableList.of(nodes.get(RandomUtils.nextInt(nodes.size())));
}
@Override
......