Thomas Vachuska
Committed by Gerrit Code Review

ONOS-2572 Fix to abstract accumulator to proactively finalize the batches when f…

…ull and to avoid repeat misfires.

Change-Id: Ibc9904b36f9cf8c9aed36e828152600a2d7a6192
...@@ -49,13 +49,13 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -49,13 +49,13 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
49 * Creates an item accumulator capable of triggering on the specified 49 * Creates an item accumulator capable of triggering on the specified
50 * thresholds. 50 * thresholds.
51 * 51 *
52 - * @param timer timer to use for scheduling check-points 52 + * @param timer timer to use for scheduling check-points
53 - * @param maxItems maximum number of items to accumulate before 53 + * @param maxItems maximum number of items to accumulate before
54 - * processing is triggered 54 + * processing is triggered
55 - * @param maxBatchMillis maximum number of millis allowed since the first 55 + * @param maxBatchMillis maximum number of millis allowed since the first
56 - * item before processing is triggered 56 + * item before processing is triggered
57 - * @param maxIdleMillis maximum number millis between items before 57 + * @param maxIdleMillis maximum number millis between items before
58 - * processing is triggered 58 + * processing is triggered
59 */ 59 */
60 protected AbstractAccumulator(Timer timer, int maxItems, 60 protected AbstractAccumulator(Timer timer, int maxItems,
61 int maxBatchMillis, int maxIdleMillis) { 61 int maxBatchMillis, int maxIdleMillis) {
...@@ -78,7 +78,7 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -78,7 +78,7 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
78 // Did we hit the max item threshold? 78 // Did we hit the max item threshold?
79 if (items.size() >= maxItems) { 79 if (items.size() >= maxItems) {
80 maxTask = cancelIfActive(maxTask); 80 maxTask = cancelIfActive(maxTask);
81 - schedule(1); 81 + scheduleNow();
82 } else { 82 } else {
83 // Otherwise, schedule idle task and if this is a first item 83 // Otherwise, schedule idle task and if this is a first item
84 // also schedule the max batch age task. 84 // also schedule the max batch age task.
...@@ -89,14 +89,30 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -89,14 +89,30 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
89 } 89 }
90 } 90 }
91 91
92 - // Schedules a new processor task given number of millis in the future. 92 + /**
93 + * Finalizes the current batch, if ready, and schedules a new processor
94 + * in the immediate future.
95 + */
96 + private void scheduleNow() {
97 + if (isReady()) {
98 + TimerTask task = new ProcessorTask(finalizeCurrentBatch());
99 + timer.schedule(task, 1);
100 + }
101 + }
102 +
103 + /**
104 + * Schedules a new processor task given number of millis in the future.
105 + * Batch finalization is deferred to time of execution.
106 + */
93 private TimerTask schedule(int millis) { 107 private TimerTask schedule(int millis) {
94 TimerTask task = new ProcessorTask(); 108 TimerTask task = new ProcessorTask();
95 timer.schedule(task, millis); 109 timer.schedule(task, millis);
96 return task; 110 return task;
97 } 111 }
98 112
99 - // Cancels the specified task if it is active. 113 + /**
114 + * Cancels the specified task if it is active.
115 + */
100 private TimerTask cancelIfActive(TimerTask task) { 116 private TimerTask cancelIfActive(TimerTask task) {
101 if (task != null) { 117 if (task != null) {
102 task.cancel(); 118 task.cancel();
...@@ -106,6 +122,19 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -106,6 +122,19 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
106 122
107 // Task for triggering processing of accumulated items 123 // Task for triggering processing of accumulated items
108 private class ProcessorTask extends TimerTask { 124 private class ProcessorTask extends TimerTask {
125 +
126 + private final List<T> items;
127 +
128 + // Creates a new processor task with deferred batch finalization.
129 + ProcessorTask() {
130 + this.items = null;
131 + }
132 +
133 + // Creates a new processor task with pre-emptive batch finalization.
134 + ProcessorTask(List<T> items) {
135 + this.items = items;
136 + }
137 +
109 @Override 138 @Override
110 public void run() { 139 public void run() {
111 synchronized (AbstractAccumulator.this) { 140 synchronized (AbstractAccumulator.this) {
...@@ -116,9 +145,9 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> { ...@@ -116,9 +145,9 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
116 synchronized (AbstractAccumulator.this) { 145 synchronized (AbstractAccumulator.this) {
117 maxTask = cancelIfActive(maxTask); 146 maxTask = cancelIfActive(maxTask);
118 } 147 }
119 - List<T> items = finalizeCurrentBatch(); 148 + List<T> batch = items != null ? items : finalizeCurrentBatch();
120 - if (!items.isEmpty()) { 149 + if (!batch.isEmpty()) {
121 - processItems(items); 150 + processItems(batch);
122 } 151 }
123 } catch (Exception e) { 152 } catch (Exception e) {
124 log.warn("Unable to process batch due to {}", e); 153 log.warn("Unable to process batch due to {}", e);
......
...@@ -20,8 +20,10 @@ import org.junit.Test; ...@@ -20,8 +20,10 @@ import org.junit.Test;
20 20
21 import java.util.List; 21 import java.util.List;
22 import java.util.Timer; 22 import java.util.Timer;
23 +import java.util.stream.IntStream;
23 24
24 import static org.junit.Assert.*; 25 import static org.junit.Assert.*;
26 +import static org.onlab.junit.TestTools.assertAfter;
25 import static org.onlab.junit.TestTools.delay; 27 import static org.onlab.junit.TestTools.delay;
26 28
27 /** 29 /**
...@@ -136,6 +138,14 @@ public class AbstractAccumulatorTest { ...@@ -136,6 +138,14 @@ public class AbstractAccumulatorTest {
136 assertEquals("incorrect batch", "abcdefg", accumulator.batch); 138 assertEquals("incorrect batch", "abcdefg", accumulator.batch);
137 } 139 }
138 140
141 + @Ignore("FIXME: timing sensitive test failing randomly.")
142 + @Test
143 + public void stormTest() {
144 + TestAccumulator accumulator = new TestAccumulator();
145 + IntStream.range(0, 1000).forEach(i -> accumulator.add(new TestItem("#" + i)));
146 + assertAfter(100, () -> assertEquals("wrong item count", 1000, accumulator.itemCount));
147 + assertEquals("wrong batch count", 200, accumulator.batchCount);
148 + }
139 149
140 private class TestItem { 150 private class TestItem {
141 private final String s; 151 private final String s;
...@@ -149,6 +159,8 @@ public class AbstractAccumulatorTest { ...@@ -149,6 +159,8 @@ public class AbstractAccumulatorTest {
149 159
150 String batch = ""; 160 String batch = "";
151 boolean ready = true; 161 boolean ready = true;
162 + int batchCount = 0;
163 + int itemCount = 0;
152 164
153 protected TestAccumulator() { 165 protected TestAccumulator() {
154 super(timer, 5, 100, 70); 166 super(timer, 5, 100, 70);
...@@ -156,6 +168,8 @@ public class AbstractAccumulatorTest { ...@@ -156,6 +168,8 @@ public class AbstractAccumulatorTest {
156 168
157 @Override 169 @Override
158 public void processItems(List<TestItem> items) { 170 public void processItems(List<TestItem> items) {
171 + batchCount++;
172 + itemCount += items.size();
159 for (TestItem item : items) { 173 for (TestItem item : items) {
160 batch += item.s; 174 batch += item.s;
161 } 175 }
......