Sho SHIMIZU
Committed by Gerrit Code Review

Make the field lastStart thread safe

The field, lastStart, is read/written from different thread pools.
As a result, the field could be accessed from different threads.

Change-Id: Ia50c5bd3405bf2af98abb9d14f7e35d840f62483
...@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue; ...@@ -32,6 +32,7 @@ import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.ExecutorService; 32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Future; 33 import java.util.concurrent.Future;
34 import java.util.concurrent.LinkedBlockingQueue; 34 import java.util.concurrent.LinkedBlockingQueue;
35 +import java.util.concurrent.atomic.AtomicLong;
35 36
36 import static com.google.common.base.Preconditions.checkArgument; 37 import static com.google.common.base.Preconditions.checkArgument;
37 import static java.util.concurrent.Executors.newSingleThreadExecutor; 38 import static java.util.concurrent.Executors.newSingleThreadExecutor;
...@@ -69,7 +70,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry ...@@ -69,7 +70,7 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
69 // Means to detect long-running sinks 70 // Means to detect long-running sinks
70 private TimerTask watchdog; 71 private TimerTask watchdog;
71 private EventSink lastSink; 72 private EventSink lastSink;
72 - private long lastStart = 0; 73 + private final AtomicLong lastStart = new AtomicLong(0);
73 private Future<?> dispatchFuture; 74 private Future<?> dispatchFuture;
74 75
75 @Override 76 @Override
...@@ -141,9 +142,9 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry ...@@ -141,9 +142,9 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
141 EventSink sink = getSink(event.getClass()); 142 EventSink sink = getSink(event.getClass());
142 if (sink != null) { 143 if (sink != null) {
143 lastSink = sink; 144 lastSink = sink;
144 - lastStart = System.currentTimeMillis(); 145 + lastStart.set(System.currentTimeMillis());
145 sink.process(event); 146 sink.process(event);
146 - lastStart = 0; 147 + lastStart.set(0);
147 } else { 148 } else {
148 log.warn("No sink registered for event class {}", 149 log.warn("No sink registered for event class {}",
149 event.getClass().getName()); 150 event.getClass().getName());
...@@ -159,9 +160,10 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry ...@@ -159,9 +160,10 @@ public class CoreEventDispatcher extends DefaultEventSinkRegistry
159 private class Watchdog extends TimerTask { 160 private class Watchdog extends TimerTask {
160 @Override 161 @Override
161 public void run() { 162 public void run() {
162 - long delta = System.currentTimeMillis() - lastStart; 163 + long lastStartLocal = lastStart.get();
163 - if (lastStart > 0 && delta > maxProcessMillis) { 164 + long delta = System.currentTimeMillis() - lastStartLocal;
164 - lastStart = 0; 165 + if (lastStartLocal > 0 && delta > maxProcessMillis) {
166 + lastStart.set(0);
165 log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop", 167 log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
166 lastSink.getClass().getName(), delta); 168 lastSink.getClass().getName(), delta);
167 169
......