Ray Milkey

Fix for ONOS-2572 - Excessive events delivered by AbstractAccumulator

- add synchronization to prevent prematurely scheduling a batch that
  isn't full.

Change-Id: I07d53ef4d81211909a6fcdd98bc937b49c7c4cca
...@@ -40,8 +40,8 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -40,8 +40,8 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
40 private final int maxBatchMillis; 40 private final int maxBatchMillis;
41 private final int maxIdleMillis; 41 private final int maxIdleMillis;
42 42
43 - private TimerTask idleTask = new ProcessorTask(); 43 + private volatile TimerTask idleTask = new ProcessorTask();
44 - private TimerTask maxTask = new ProcessorTask(); 44 + private volatile TimerTask maxTask = new ProcessorTask();
45 45
46 private List<T> items = Lists.newArrayList(); 46 private List<T> items = Lists.newArrayList();
47 47
...@@ -108,10 +108,14 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -108,10 +108,14 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
108 private class ProcessorTask extends TimerTask { 108 private class ProcessorTask extends TimerTask {
109 @Override 109 @Override
110 public void run() { 110 public void run() {
111 + synchronized (AbstractAccumulator.this) {
111 idleTask = cancelIfActive(idleTask); 112 idleTask = cancelIfActive(idleTask);
113 + }
112 if (isReady()) { 114 if (isReady()) {
113 try { 115 try {
116 + synchronized (AbstractAccumulator.this) {
114 maxTask = cancelIfActive(maxTask); 117 maxTask = cancelIfActive(maxTask);
118 + }
115 List<T> items = finalizeCurrentBatch(); 119 List<T> items = finalizeCurrentBatch();
116 if (!items.isEmpty()) { 120 if (!items.isEmpty()) {
117 processItems(items); 121 processItems(items);
...@@ -120,10 +124,12 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -120,10 +124,12 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
120 log.warn("Unable to process batch due to {}", e); 124 log.warn("Unable to process batch due to {}", e);
121 } 125 }
122 } else { 126 } else {
127 + synchronized (AbstractAccumulator.this) {
123 idleTask = schedule(maxIdleMillis); 128 idleTask = schedule(maxIdleMillis);
124 } 129 }
125 } 130 }
126 } 131 }
132 + }
127 133
128 // Demotes and returns the current batch of items and promotes a new one. 134 // Demotes and returns the current batch of items and promotes a new one.
129 private synchronized List<T> finalizeCurrentBatch() { 135 private synchronized List<T> finalizeCurrentBatch() {
......