Thomas Vachuska
Committed by Gerrit Code Review

Adding configurability to the even accumulator for the topology provider.

Change-Id: I35ede9a62782dc6a2e55b8895aeec6ece8836960
......@@ -58,6 +58,11 @@
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
......
......@@ -15,9 +15,12 @@
*/
package org.onlab.onos.net.topology.impl;
import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Modified;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
......@@ -36,13 +39,16 @@ import org.onlab.onos.net.topology.GraphDescription;
import org.onlab.onos.net.topology.TopologyProvider;
import org.onlab.onos.net.topology.TopologyProviderRegistry;
import org.onlab.onos.net.topology.TopologyProviderService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.Dictionary;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.onos.core.CoreService.CORE_PROVIDER_ID;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
......@@ -59,16 +65,27 @@ import static org.slf4j.LoggerFactory.getLogger;
public class DefaultTopologyProvider extends AbstractProvider
implements TopologyProvider {
// TODO: make these configurable
private static final int MAX_EVENTS = 100;
private static final int MAX_IDLE_MS = 5;
private static final int MAX_BATCH_MS = 50;
private static final int MAX_THREADS = 8;
private static final int DEFAULT_MAX_EVENTS = 100;
private static final int DEFAULT_MAX_BATCH_MS = 50;
private static final int DEFAULT_MAX_IDLE_MS = 5;
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer();
@Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
label = "Maximum number of events to accumulate")
private int maxEvents = DEFAULT_MAX_EVENTS;
@Property(name = "maxIdleMs", intValue = DEFAULT_MAX_IDLE_MS,
label = "Maximum number of millis between events")
private int maxIdleMs = DEFAULT_MAX_IDLE_MS;
@Property(name = "maxBatchMs", intValue = DEFAULT_MAX_BATCH_MS,
label = "Maximum number of millis for whole batch")
private int maxBatchMs = DEFAULT_MAX_BATCH_MS;
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
......@@ -97,9 +114,9 @@ public class DefaultTopologyProvider extends AbstractProvider
}
@Activate
public synchronized void activate() {
public synchronized void activate(ComponentContext context) {
executor = newFixedThreadPool(MAX_THREADS, namedThreads("topo-build-%d"));
accumulator = new TopologyChangeAccumulator();
modified(context);
providerService = providerRegistry.register(this);
deviceService.addListener(deviceListener);
......@@ -111,7 +128,7 @@ public class DefaultTopologyProvider extends AbstractProvider
}
@Deactivate
public synchronized void deactivate() {
public synchronized void deactivate(ComponentContext context) {
isStarted = false;
deviceService.removeListener(deviceListener);
......@@ -125,6 +142,41 @@ public class DefaultTopologyProvider extends AbstractProvider
log.info("Stopped");
}
@Modified
public void modified(ComponentContext context) {
if (context == null) {
accumulator = new TopologyChangeAccumulator();
return;
}
Dictionary properties = context.getProperties();
int newMaxEvents, newMaxBatchMs, newMaxIdleMs;
try {
String s = (String) properties.get("maxEvents");
newMaxEvents = isNullOrEmpty(s) ? maxEvents : Integer.parseInt(s);
s = (String) properties.get("maxBatchMs");
newMaxBatchMs = isNullOrEmpty(s) ? maxBatchMs : Integer.parseInt(s);
s = (String) properties.get("maxIdleMs");
newMaxIdleMs = isNullOrEmpty(s) ? maxIdleMs : Integer.parseInt(s);
} catch (Exception e) {
newMaxEvents = DEFAULT_MAX_EVENTS;
newMaxBatchMs = DEFAULT_MAX_BATCH_MS;
newMaxIdleMs = DEFAULT_MAX_IDLE_MS;
}
if (newMaxEvents != maxEvents || newMaxBatchMs != maxBatchMs || newMaxIdleMs != maxIdleMs) {
maxEvents = newMaxEvents;
maxBatchMs = newMaxBatchMs;
maxIdleMs = newMaxIdleMs;
accumulator = maxEvents > 1 ? new TopologyChangeAccumulator() : null;
log.info("Reconfigured with maxEvents = {}; maxBatchMs = {}; maxIdleMs = {}",
maxEvents, maxBatchMs, maxIdleMs);
}
}
@Override
public void triggerRecompute() {
triggerTopologyBuild(Collections.<Event>emptyList());
......@@ -154,6 +206,14 @@ public class DefaultTopologyProvider extends AbstractProvider
}
}
private void processEvent(Event event) {
if (accumulator != null) {
accumulator.add(event);
} else {
triggerTopologyBuild(ImmutableList.of(event));
}
}
// Callback for device events
private class InternalDeviceListener implements DeviceListener {
@Override
......@@ -161,7 +221,7 @@ public class DefaultTopologyProvider extends AbstractProvider
DeviceEvent.Type type = event.type();
if (type == DEVICE_ADDED || type == DEVICE_REMOVED ||
type == DEVICE_AVAILABILITY_CHANGED) {
accumulator.add(event);
processEvent(event);
}
}
}
......@@ -170,7 +230,7 @@ public class DefaultTopologyProvider extends AbstractProvider
private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
processEvent(event);
}
}
......@@ -179,7 +239,7 @@ public class DefaultTopologyProvider extends AbstractProvider
extends AbstractEventAccumulator implements EventAccumulator {
TopologyChangeAccumulator() {
super(TIMER, MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
super(TIMER, maxEvents, maxBatchMs, maxIdleMs);
}
@Override
......
......@@ -67,12 +67,12 @@ public class DefaultTopologyProviderTest {
provider.deviceService = deviceService;
provider.linkService = linkService;
provider.providerRegistry = topologyService;
provider.activate();
provider.activate(null);
}
@After
public void tearDown() {
provider.deactivate();
provider.deactivate(null);
provider.providerRegistry = null;
provider.deviceService = null;
provider.linkService = null;
......