Brian O'Connor

AbstractAccumulator: synchronized add and null check

Change-Id: I2999311d19ab36c17413ebc93398483a7d012714
...@@ -72,9 +72,9 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -72,9 +72,9 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
72 } 72 }
73 73
74 @Override 74 @Override
75 - public void add(T event) { 75 + public synchronized void add(T event) {
76 idleTask = cancelIfActive(idleTask); 76 idleTask = cancelIfActive(idleTask);
77 - events.add(event); 77 + events.add(checkNotNull(event, "Event cannot be null"));
78 78
79 // Did we hit the max event threshold? 79 // Did we hit the max event threshold?
80 if (events.size() == maxEvents) { 80 if (events.size() == maxEvents) {
...@@ -114,7 +114,7 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -114,7 +114,7 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
114 maxTask = cancelIfActive(maxTask); 114 maxTask = cancelIfActive(maxTask);
115 processEvents(finalizeCurrentBatch()); 115 processEvents(finalizeCurrentBatch());
116 } catch (Exception e) { 116 } catch (Exception e) {
117 - log.warn("Unable to process batch due to {}", e.getMessage()); 117 + log.warn("Unable to process batch due to {}", e);
118 } 118 }
119 } 119 }
120 } 120 }
......