Thomas Vachuska
Committed by Gerrit Code Review

Enhancing accumulator to allow subclasses to indicate whether they are ready for…

… the batch to be processed. Default behaviour returns true.

Change-Id: I53a3ffc3ecd75ed2607f155a61971e05a6009a66
......@@ -76,7 +76,7 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
items.add(checkNotNull(item, "Item cannot be null"));
// Did we hit the max item threshold?
if (items.size() == maxItems) {
if (items.size() >= maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
......@@ -108,12 +108,16 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processItems(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
idleTask = cancelIfActive(idleTask);
if (isReady()) {
try {
maxTask = cancelIfActive(maxTask);
processItems(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
}
} else {
idleTask = schedule(maxIdleMillis);
}
}
}
......@@ -125,6 +129,11 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
return toBeProcessed;
}
@Override
public boolean isReady() {
return true;
}
/**
* Returns the backing timer.
*
......@@ -163,4 +172,5 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
public int maxIdleMillis() {
return maxIdleMillis;
}
}
......
......@@ -40,6 +40,10 @@ public interface Accumulator<T> {
*/
void processItems(List<T> items);
//TODO consider a blocking version that required consumer participation
/**
* Indicates whether the accumulator is ready to process items.
*
* @return true if ready to process
*/
boolean isReady();
}
......
......@@ -37,7 +37,7 @@ public class AbstractAccumulatorTest {
assertEquals("incorrect timer", timer, accumulator.timer());
assertEquals("incorrect max events", 5, accumulator.maxItems());
assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
assertEquals("incorrect idle ms", 70, accumulator.maxIdleMillis());
}
@Test
......@@ -68,7 +68,7 @@ public class AbstractAccumulatorTest {
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestItem("d"));
delay(30);
delay(60);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcd", accumulator.batch);
}
......@@ -84,6 +84,54 @@ public class AbstractAccumulatorTest {
assertEquals("incorrect batch", "ab", accumulator.batch);
}
@Test
public void readyIdleTrigger() {
TestAccumulator accumulator = new TestAccumulator();
accumulator.ready = false;
accumulator.add(new TestItem("a"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestItem("b"));
delay(80);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.ready = true;
delay(80);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "ab", accumulator.batch);
}
@Test
public void readyLongTrigger() {
TestAccumulator accumulator = new TestAccumulator();
accumulator.ready = false;
delay(120);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestItem("a"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.ready = true;
delay(80);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "a", accumulator.batch);
}
@Test
public void readyMaxTrigger() {
TestAccumulator accumulator = new TestAccumulator();
accumulator.ready = false;
accumulator.add(new TestItem("a"));
accumulator.add(new TestItem("b"));
accumulator.add(new TestItem("c"));
accumulator.add(new TestItem("d"));
accumulator.add(new TestItem("e"));
accumulator.add(new TestItem("f"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.ready = true;
accumulator.add(new TestItem("g"));
delay(5);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcdefg", accumulator.batch);
}
private class TestItem {
private final String s;
......@@ -95,9 +143,10 @@ public class AbstractAccumulatorTest {
private class TestAccumulator extends AbstractAccumulator<TestItem> {
String batch = "";
boolean ready = true;
protected TestAccumulator() {
super(timer, 5, 100, 50);
super(timer, 5, 100, 70);
}
@Override
......@@ -106,6 +155,11 @@ public class AbstractAccumulatorTest {
batch += item.s;
}
}
@Override
public boolean isReady() {
return ready;
}
}
}
......