Thomas Vachuska
Committed by Ray Milkey

Updated accumulator documentation and refactored names to remove the event heritage.

Change-Id: I2238ab1215281702e670a406fb901ba8a4ef85ce
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.event;
import org.onlab.util.AbstractAccumulator;
import java.util.Timer;
/**
* Base implementation of an event accumulator. It allows triggering based on
* event inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
public abstract class AbstractEventAccumulator
extends AbstractAccumulator<Event>
implements EventAccumulator {
/**
* Creates an event accumulator capable of triggering on the specified
* thresholds.
*
* @param timer timer to use for scheduling check-points
* @param maxEvents maximum number of events to accumulate before
* processing is triggered
* @param maxBatchMillis maximum number of millis allowed since the first
* event before processing is triggered
* @param maxIdleMillis maximum number millis between events before
* processing is triggered
*/
protected AbstractEventAccumulator(Timer timer, int maxEvents,
int maxBatchMillis, int maxIdleMillis) {
super(timer, maxEvents, maxBatchMillis, maxIdleMillis);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.event;
import org.onlab.util.Accumulator;
/**
* Abstraction of an accumulator capable of collecting events and at some
* point in time triggers processing of all previously accumulated events.
*/
public interface EventAccumulator extends Accumulator<Event> {
}
......@@ -53,8 +53,8 @@ public class IntentAccumulator extends AbstractAccumulator<IntentData> {
}
@Override
public void processEvents(List<IntentData> ops) {
delegate.execute(reduce(ops));
public void processItems(List<IntentData> items) {
delegate.execute(reduce(items));
// FIXME kick off the work
//for (IntentData data : opMap.values()) {}
}
......
......@@ -16,7 +16,6 @@
package org.onosproject.net.topology.impl;
import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -25,9 +24,9 @@ import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.event.AbstractEventAccumulator;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.event.Event;
import org.onosproject.event.EventAccumulator;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
......@@ -51,9 +50,9 @@ import java.util.concurrent.ExecutorService;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.core.CoreService.CORE_PROVIDER_ID;
import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -104,7 +103,7 @@ public class DefaultTopologyProvider extends AbstractProvider
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
private EventAccumulator accumulator;
private Accumulator<Event> accumulator;
private ExecutorService executor;
/**
......@@ -245,18 +244,15 @@ public class DefaultTopologyProvider extends AbstractProvider
}
// Event accumulator for paced triggering of topology assembly.
private class TopologyChangeAccumulator
extends AbstractEventAccumulator implements EventAccumulator {
private class TopologyChangeAccumulator extends AbstractAccumulator<Event> {
TopologyChangeAccumulator() {
super(TIMER, maxEvents, maxBatchMs, maxIdleMs);
}
@Override
public void processEvents(List<Event> events) {
triggerTopologyBuild(events);
public void processItems(List<Event> items) {
triggerTopologyBuild(items);
}
}
// Task for building topology data in a separate thread.
......
......@@ -27,64 +27,63 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Base implementation of an event accumulator. It allows triggering based on
* event inter-arrival time threshold, maximum batch life threshold and maximum
* Base implementation of an item accumulator. It allows triggering based on
* item inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
// FIXME refactor the names here
public abstract class AbstractAccumulator<T> implements Accumulator<T> {
private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
private final Timer timer;
private final int maxEvents;
private final int maxItems;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
private List<T> events = Lists.newArrayList();
private List<T> items = Lists.newArrayList();
/**
* Creates an event accumulator capable of triggering on the specified
* Creates an item accumulator capable of triggering on the specified
* thresholds.
*
* @param timer timer to use for scheduling check-points
* @param maxEvents maximum number of events to accumulate before
* @param maxItems maximum number of items to accumulate before
* processing is triggered
* @param maxBatchMillis maximum number of millis allowed since the first
* event before processing is triggered
* @param maxIdleMillis maximum number millis between events before
* item before processing is triggered
* @param maxIdleMillis maximum number millis between items before
* processing is triggered
*/
protected AbstractAccumulator(Timer timer, int maxEvents,
protected AbstractAccumulator(Timer timer, int maxItems,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
checkArgument(maxItems > 1, "Maximum number of items must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
this.maxEvents = maxEvents;
this.maxItems = maxItems;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
public synchronized void add(T event) {
public synchronized void add(T item) {
idleTask = cancelIfActive(idleTask);
events.add(checkNotNull(event, "Event cannot be null"));
items.add(checkNotNull(item, "Item cannot be null"));
// Did we hit the max event threshold?
if (events.size() == maxEvents) {
// Did we hit the max item threshold?
if (items.size() == maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
// Otherwise, schedule idle task and if this is a first event
// Otherwise, schedule idle task and if this is a first item
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
if (events.size() == 1) {
if (items.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
......@@ -105,24 +104,24 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
return task;
}
// Task for triggering processing of accumulated events
// Task for triggering processing of accumulated items
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processEvents(finalizeCurrentBatch());
processItems(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
}
}
}
// Demotes and returns the current batch of events and promotes a new one.
// Demotes and returns the current batch of items and promotes a new one.
private synchronized List<T> finalizeCurrentBatch() {
List<T> toBeProcessed = events;
events = Lists.newArrayList();
List<T> toBeProcessed = items;
items = Lists.newArrayList();
return toBeProcessed;
}
......@@ -136,18 +135,18 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
}
/**
* Returns the maximum number of events allowed to accumulate before
* Returns the maximum number of items allowed to accumulate before
* processing is triggered.
*
* @return max number of events
* @return max number of items
*/
public int maxEvents() {
return maxEvents;
public int maxItems() {
return maxItems;
}
/**
* Returns the maximum number of millis allowed to expire since the first
* event before processing is triggered.
* item before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
......@@ -157,9 +156,9 @@ public abstract class AbstractAccumulator<T> implements Accumulator<T> {
/**
* Returns the maximum number of millis allowed to expire since the last
* event arrival before processing is triggered.
* item arrival before processing is triggered.
*
* @return max number of millis since the last event
* @return max number of millis since the last item
*/
public int maxIdleMillis() {
return maxIdleMillis;
......
......@@ -18,25 +18,28 @@ package org.onlab.util;
import java.util.List;
/**
* Abstraction of an accumulator capable of collecting events and at some
* point in time triggers processing of all previously accumulated events.
* Abstraction of an accumulator capable of collecting items and at some
* point in time triggers processing of all previously accumulated items.
*
* @param <T> item type
*/
public interface Accumulator<T> {
/**
* Adds an event to the current batch. This operation may, or may not
* trigger processing of the current batch of events.
* Adds an item to the current batch. This operation may, or may not
* trigger processing of the current batch of items.
*
* @param event event to be added to the current batch
* @param item item to be added to the current batch
*/
void add(T event);
void add(T item);
/**
* Processes the specified list of accumulated events.
* Processes the specified list of accumulated items.
*
* @param events list of accumulated events
* @param items list of accumulated items
*/
void processEvents(List<T> events);
void processItems(List<T> items);
//TODO consider a blocking version that required consumer participation
}
......
/*
* Copyright 2014 Open Networking Laboratory
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.event;
package org.onlab.util;
import org.junit.Ignore;
import org.junit.Test;
......@@ -21,16 +21,13 @@ import org.junit.Test;
import java.util.List;
import java.util.Timer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import static org.onlab.junit.TestTools.delay;
import static org.onosproject.event.TestEvent.Type.FOO;
/**
* Tests the operation of the accumulator.
*/
public class AbstractEventAccumulatorTest {
public class AbstractAccumulatorTest {
private final Timer timer = new Timer();
......@@ -38,7 +35,7 @@ public class AbstractEventAccumulatorTest {
public void basics() throws Exception {
TestAccumulator accumulator = new TestAccumulator();
assertEquals("incorrect timer", timer, accumulator.timer());
assertEquals("incorrect max events", 5, accumulator.maxEvents());
assertEquals("incorrect max events", 5, accumulator.maxItems());
assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
}
......@@ -46,12 +43,12 @@ public class AbstractEventAccumulatorTest {
@Test
public void eventTrigger() {
TestAccumulator accumulator = new TestAccumulator();
accumulator.add(new TestEvent(FOO, "a"));
accumulator.add(new TestEvent(FOO, "b"));
accumulator.add(new TestEvent(FOO, "c"));
accumulator.add(new TestEvent(FOO, "d"));
accumulator.add(new TestItem("a"));
accumulator.add(new TestItem("b"));
accumulator.add(new TestItem("c"));
accumulator.add(new TestItem("d"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestEvent(FOO, "e"));
accumulator.add(new TestItem("e"));
delay(20);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcde", accumulator.batch);
......@@ -61,16 +58,16 @@ public class AbstractEventAccumulatorTest {
@Test
public void timeTrigger() {
TestAccumulator accumulator = new TestAccumulator();
accumulator.add(new TestEvent(FOO, "a"));
accumulator.add(new TestItem("a"));
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestEvent(FOO, "b"));
accumulator.add(new TestItem("b"));
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestEvent(FOO, "c"));
accumulator.add(new TestItem("c"));
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestEvent(FOO, "d"));
accumulator.add(new TestItem("d"));
delay(30);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcd", accumulator.batch);
......@@ -79,15 +76,23 @@ public class AbstractEventAccumulatorTest {
@Test
public void idleTrigger() {
TestAccumulator accumulator = new TestAccumulator();
accumulator.add(new TestEvent(FOO, "a"));
accumulator.add(new TestItem("a"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
accumulator.add(new TestEvent(FOO, "b"));
accumulator.add(new TestItem("b"));
delay(80);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "ab", accumulator.batch);
}
private class TestAccumulator extends AbstractEventAccumulator {
private class TestItem {
private final String s;
public TestItem(String s) {
this.s = s;
}
}
private class TestAccumulator extends AbstractAccumulator<TestItem> {
String batch = "";
......@@ -96,10 +101,11 @@ public class AbstractEventAccumulatorTest {
}
@Override
public void processEvents(List<Event> events) {
for (Event event : events) {
batch += event.subject();
public void processItems(List<TestItem> items) {
for (TestItem item : items) {
batch += item.s;
}
}
}
}
......
......@@ -20,14 +20,14 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.eclipse.jetty.websocket.WebSocket;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractEventAccumulator;
import org.onosproject.event.Event;
import org.onosproject.event.EventAccumulator;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
......@@ -120,7 +120,7 @@ public class TopologyViewWebSocket
private final IntentListener intentListener = new InternalIntentListener();
private final FlowRuleListener flowListener = new InternalFlowListener();
private final EventAccumulator eventAccummulator = new InternalEventAccummulator();
private final Accumulator<Event> eventAccummulator = new InternalEventAccummulator();
private TimerTask trafficTask;
private ObjectNode trafficEvent;
......@@ -721,13 +721,13 @@ public class TopologyViewWebSocket
}
// Accumulates events to drive methodic update of the summary pane.
private class InternalEventAccummulator extends AbstractEventAccumulator {
private class InternalEventAccummulator extends AbstractAccumulator<Event> {
protected InternalEventAccummulator() {
super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
}
@Override
public void processEvents(List<Event> events) {
public void processItems(List<Event> items) {
try {
if (summaryEvent != null) {
sendMessage(summmaryMessage(0));
......