Madan Jampani
Committed by Gerrit Code Review

Make CoreEventDispatcher's listener processing time tracking thread-safe

Change-Id: Ib9f109e41fd1b78ce9771a2bb54e8bf3dda38d6c
......@@ -27,11 +27,14 @@ import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventSink;
import org.slf4j.Logger;
import com.google.common.base.Stopwatch;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
......@@ -71,7 +74,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
// Means to detect long-running sinks
private TimerTask watchdog;
private EventSink lastSink;
private long lastStart = 0;
private final Stopwatch stopwatch = Stopwatch.createUnstarted();
private Future<?> dispatchFuture;
@Override
......@@ -166,9 +169,9 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
EventSink sink = getSink(event.getClass());
if (sink != null) {
lastSink = sink;
lastStart = System.currentTimeMillis();
stopwatch.start();
sink.process(event);
lastStart = 0;
stopwatch.reset();
} else {
log.warn("No sink registered for event class {}",
event.getClass().getName());
......@@ -184,11 +187,11 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
private class Watchdog extends TimerTask {
@Override
public void run() {
long delta = System.currentTimeMillis() - lastStart;
if (lastStart > 0 && delta > maxProcessMillis) {
lastStart = 0;
long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsedTimeMillis > maxProcessMillis) {
stopwatch.reset();
log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
lastSink.getClass().getName(), delta);
lastSink.getClass().getName(), elapsedTimeMillis);
// Notify the sink that it has exceeded its time limit.
lastSink.onProcessLimit();
......