Thomas Vachuska
Committed by Gerrit Code Review

Added a detection mechanism for long-running or dead-locked event sinks and listeners.

Change-Id: I21308b058902a94c31c34c2ec2878cd13213874e
...@@ -51,6 +51,7 @@ public class DefaultEventSinkRegistry implements EventSinkRegistry { ...@@ -51,6 +51,7 @@ public class DefaultEventSinkRegistry implements EventSinkRegistry {
51 @Override 51 @Override
52 @SuppressWarnings("unchecked") 52 @SuppressWarnings("unchecked")
53 public <E extends Event> EventSink<E> getSink(Class<E> eventClass) { 53 public <E extends Event> EventSink<E> getSink(Class<E> eventClass) {
54 + checkNotNull(eventClass, "Event class cannot be null");
54 return (EventSink<E>) sinks.get(eventClass); 55 return (EventSink<E>) sinks.get(eventClass);
55 } 56 }
56 57
......
...@@ -27,4 +27,10 @@ public interface EventSink<E extends Event> { ...@@ -27,4 +27,10 @@ public interface EventSink<E extends Event> {
27 */ 27 */
28 void process(E event); 28 void process(E event);
29 29
30 + /**
31 + * Handles notification that event processing time limit has been exceeded.
32 + */
33 + default void onProcessLimit() {
34 + }
35 +
30 } 36 }
......
...@@ -35,6 +35,9 @@ public class ListenerRegistry<E extends Event, L extends EventListener<E>> ...@@ -35,6 +35,9 @@ public class ListenerRegistry<E extends Event, L extends EventListener<E>>
35 35
36 private volatile boolean shutdown = false; 36 private volatile boolean shutdown = false;
37 37
38 + private long lastStart;
39 + private L lastListener;
40 +
38 /** 41 /**
39 * Set of listeners that have registered. 42 * Set of listeners that have registered.
40 */ 43 */
...@@ -67,13 +70,26 @@ public class ListenerRegistry<E extends Event, L extends EventListener<E>> ...@@ -67,13 +70,26 @@ public class ListenerRegistry<E extends Event, L extends EventListener<E>>
67 public void process(E event) { 70 public void process(E event) {
68 for (L listener : listeners) { 71 for (L listener : listeners) {
69 try { 72 try {
73 + lastListener = listener;
74 + lastStart = System.currentTimeMillis();
70 listener.event(event); 75 listener.event(event);
76 + lastStart = 0;
71 } catch (Exception error) { 77 } catch (Exception error) {
72 reportProblem(event, error); 78 reportProblem(event, error);
73 } 79 }
74 } 80 }
75 } 81 }
76 82
83 + @Override
84 + public void onProcessLimit() {
85 + if (lastStart > 0) {
86 + log.error("Listener {} exceeded execution time limit: {} ms; ejected",
87 + lastListener.getClass().getName(),
88 + System.currentTimeMillis() - lastStart);
89 + removeListener(lastListener);
90 + }
91 + }
92 +
77 /** 93 /**
78 * Predicate indicating whether we should throw an exception if the 94 * Predicate indicating whether we should throw an exception if the
79 * argument to {@link #removeListener} is not in the current set of 95 * argument to {@link #removeListener} is not in the current set of
......
...@@ -19,6 +19,7 @@ import org.apache.felix.scr.annotations.Activate; ...@@ -19,6 +19,7 @@ import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component; 19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Service; 21 import org.apache.felix.scr.annotations.Service;
22 +import org.onlab.util.SharedExecutors;
22 import org.onosproject.event.AbstractEvent; 23 import org.onosproject.event.AbstractEvent;
23 import org.onosproject.event.DefaultEventSinkRegistry; 24 import org.onosproject.event.DefaultEventSinkRegistry;
24 import org.onosproject.event.Event; 25 import org.onosproject.event.Event;
...@@ -26,8 +27,10 @@ import org.onosproject.event.EventDeliveryService; ...@@ -26,8 +27,10 @@ import org.onosproject.event.EventDeliveryService;
26 import org.onosproject.event.EventSink; 27 import org.onosproject.event.EventSink;
27 import org.slf4j.Logger; 28 import org.slf4j.Logger;
28 29
30 +import java.util.TimerTask;
29 import java.util.concurrent.BlockingQueue; 31 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.ExecutorService;
33 +import java.util.concurrent.Future;
31 import java.util.concurrent.LinkedBlockingQueue; 34 import java.util.concurrent.LinkedBlockingQueue;
32 35
33 import static java.util.concurrent.Executors.newSingleThreadExecutor; 36 import static java.util.concurrent.Executors.newSingleThreadExecutor;
...@@ -42,8 +45,14 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -42,8 +45,14 @@ import static org.slf4j.LoggerFactory.getLogger;
42 public class CoreEventDispatcher extends DefaultEventSinkRegistry 45 public class CoreEventDispatcher extends DefaultEventSinkRegistry
43 implements EventDeliveryService { 46 implements EventDeliveryService {
44 47
48 + // Maximum number of millis a sink can take to process an event.
49 + private static final long MAX_EXECUTE_MS = 1_000;
50 + private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
51 +
45 private final Logger log = getLogger(getClass()); 52 private final Logger log = getLogger(getClass());
46 53
54 + private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
55 +
47 private final ExecutorService executor = 56 private final ExecutorService executor =
48 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d")); 57 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
49 58
...@@ -51,34 +60,45 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry ...@@ -51,34 +60,45 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
51 private static final Event KILL_PILL = new AbstractEvent(null, 0) { 60 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
52 }; 61 };
53 62
54 - private final BlockingQueue<Event> events = new LinkedBlockingQueue<>(); 63 + private DispatchLoop dispatchLoop;
55 64
56 - private volatile boolean stopped = false; 65 + // Means to detect long-running sinks
66 + private TimerTask watchdog;
67 + private EventSink lastSink;
68 + private long lastStart = 0;
69 + private Future<?> dispatchFuture;
57 70
58 @Override 71 @Override
59 public void post(Event event) { 72 public void post(Event event) {
60 - events.add(event); 73 + if (!events.add(event)) {
74 + log.error("Unable to post event {}", event);
75 + }
61 } 76 }
62 77
63 @Activate 78 @Activate
64 public void activate() { 79 public void activate() {
65 - stopped = false; 80 + dispatchLoop = new DispatchLoop();
66 - executor.execute(new DispatchLoop()); 81 + dispatchFuture = executor.submit(dispatchLoop);
82 + watchdog = new Watchdog();
83 + SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
67 log.info("Started"); 84 log.info("Started");
68 } 85 }
69 86
70 @Deactivate 87 @Deactivate
71 public void deactivate() { 88 public void deactivate() {
72 - stopped = true; 89 + dispatchLoop.stop();
90 + watchdog.cancel();
73 post(KILL_PILL); 91 post(KILL_PILL);
74 log.info("Stopped"); 92 log.info("Stopped");
75 } 93 }
76 94
77 // Auxiliary event dispatching loop that feeds off the events queue. 95 // Auxiliary event dispatching loop that feeds off the events queue.
78 private class DispatchLoop implements Runnable { 96 private class DispatchLoop implements Runnable {
97 + private volatile boolean stopped;
98 +
79 @Override 99 @Override
80 - @SuppressWarnings("unchecked")
81 public void run() { 100 public void run() {
101 + stopped = false;
82 log.info("Dispatch loop initiated"); 102 log.info("Dispatch loop initiated");
83 while (!stopped) { 103 while (!stopped) {
84 try { 104 try {
...@@ -87,22 +107,52 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry ...@@ -87,22 +107,52 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
87 if (event == KILL_PILL) { 107 if (event == KILL_PILL) {
88 break; 108 break;
89 } 109 }
90 - 110 + process(event);
91 - // Locate the sink for the event class and use it to
92 - // process the event
93 - EventSink sink = getSink(event.getClass());
94 - if (sink != null) {
95 - sink.process(event);
96 - } else {
97 - log.warn("No sink registered for event class {}",
98 - event.getClass());
99 - }
100 } catch (Exception e) { 111 } catch (Exception e) {
101 log.warn("Error encountered while dispatching event:", e); 112 log.warn("Error encountered while dispatching event:", e);
102 } 113 }
103 } 114 }
104 log.info("Dispatch loop terminated"); 115 log.info("Dispatch loop terminated");
105 } 116 }
117 +
118 + // Locate the sink for the event class and use it to process the event
119 + @SuppressWarnings("unchecked")
120 + private void process(Event event) {
121 + EventSink sink = getSink(event.getClass());
122 + if (sink != null) {
123 + lastSink = sink;
124 + lastStart = System.currentTimeMillis();
125 + sink.process(event);
126 + lastStart = 0;
127 + } else {
128 + log.warn("No sink registered for event class {}",
129 + event.getClass());
130 + }
131 + }
132 +
133 + void stop() {
134 + stopped = true;
135 + }
106 } 136 }
107 137
138 + // Monitors event sinks to make sure none take too long to execute.
139 + private class Watchdog extends TimerTask {
140 + @Override
141 + public void run() {
142 + long delta = System.currentTimeMillis() - lastStart;
143 + if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
144 + log.error("Event sink {} exceeded execution time limit: {} ms",
145 + lastSink.getClass().getName(), delta);
146 +
147 + // Notify the sink that it has exceeded its time limit.
148 + lastSink.onProcessLimit();
149 +
150 + // Cancel the old dispatch loop and submit a new one.
151 + dispatchLoop.stop();
152 + dispatchLoop = new DispatchLoop();
153 + dispatchFuture.cancel(true);
154 + dispatchFuture = executor.submit(dispatchLoop);
155 + }
156 + }
157 + }
108 } 158 }
......
...@@ -58,6 +58,7 @@ import org.slf4j.Logger; ...@@ -58,6 +58,7 @@ import org.slf4j.Logger;
58 58
59 import java.util.Collection; 59 import java.util.Collection;
60 import java.util.List; 60 import java.util.List;
61 +import java.util.Objects;
61 import java.util.concurrent.CompletableFuture; 62 import java.util.concurrent.CompletableFuture;
62 import java.util.concurrent.ScheduledExecutorService; 63 import java.util.concurrent.ScheduledExecutorService;
63 import java.util.concurrent.TimeUnit; 64 import java.util.concurrent.TimeUnit;
...@@ -423,8 +424,8 @@ public class DeviceManager ...@@ -423,8 +424,8 @@ public class DeviceManager
423 } 424 }
424 425
425 @Override 426 @Override
426 - public void receivedRoleReply( 427 + public void receivedRoleReply(DeviceId deviceId, MastershipRole requested,
427 - DeviceId deviceId, MastershipRole requested, MastershipRole response) { 428 + MastershipRole response) {
428 // Several things can happen here: 429 // Several things can happen here:
429 // 1. request and response match 430 // 1. request and response match
430 // 2. request and response don't match 431 // 2. request and response don't match
...@@ -436,7 +437,7 @@ public class DeviceManager ...@@ -436,7 +437,7 @@ public class DeviceManager
436 // FIXME: implement response to this notification 437 // FIXME: implement response to this notification
437 438
438 log.info("got reply to a role request for {}: asked for {}, and got {}", 439 log.info("got reply to a role request for {}: asked for {}, and got {}",
439 - deviceId, requested, response); 440 + deviceId, requested, response);
440 441
441 if (requested == null && response == null) { 442 if (requested == null && response == null) {
442 // something was off with DeviceProvider, maybe check channel too? 443 // something was off with DeviceProvider, maybe check channel too?
...@@ -445,9 +446,8 @@ public class DeviceManager ...@@ -445,9 +446,8 @@ public class DeviceManager
445 return; 446 return;
446 } 447 }
447 448
448 - if (requested.equals(response)) { 449 + if (Objects.equals(requested, response)) {
449 - if (requested.equals(mastershipService.getLocalRole(deviceId))) { 450 + if (Objects.equals(requested, mastershipService.getLocalRole(deviceId))) {
450 -
451 return; 451 return;
452 } else { 452 } else {
453 return; 453 return;
...@@ -464,19 +464,16 @@ public class DeviceManager ...@@ -464,19 +464,16 @@ public class DeviceManager
464 //post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device)); 464 //post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device));
465 } 465 }
466 } 466 }
467 -
468 } 467 }
469 468
470 @Override 469 @Override
471 public void updatePortStatistics(DeviceId deviceId, Collection<PortStatistics> portStatistics) { 470 public void updatePortStatistics(DeviceId deviceId, Collection<PortStatistics> portStatistics) {
472 checkNotNull(deviceId, DEVICE_ID_NULL); 471 checkNotNull(deviceId, DEVICE_ID_NULL);
473 - checkNotNull(portStatistics, 472 + checkNotNull(portStatistics, "Port statistics list cannot be null");
474 - "Port statistics list cannot be null");
475 checkValidity(); 473 checkValidity();
476 474
477 DeviceEvent event = store.updatePortStatistics(this.provider().id(), 475 DeviceEvent event = store.updatePortStatistics(this.provider().id(),
478 - deviceId, portStatistics); 476 + deviceId, portStatistics);
479 -
480 post(event); 477 post(event);
481 } 478 }
482 } 479 }
...@@ -634,8 +631,7 @@ public class DeviceManager ...@@ -634,8 +631,7 @@ public class DeviceManager
634 } 631 }
635 632
636 // Store delegate to re-post events emitted from the store. 633 // Store delegate to re-post events emitted from the store.
637 - private class InternalStoreDelegate 634 + private class InternalStoreDelegate implements DeviceStoreDelegate {
638 - implements DeviceStoreDelegate {
639 @Override 635 @Override
640 public void notify(DeviceEvent event) { 636 public void notify(DeviceEvent event) {
641 post(event); 637 post(event);
......