Brian O'Connor

Started refactoring Intent Manager

Introduced IntentData and reworked APIs

Change-Id: I1fa437ceb1b72c4017ac2da1573bfbeb64c0632a
......@@ -15,35 +15,18 @@
*/
package org.onosproject.event;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.onlab.util.AbstractAccumulator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Base implementation of an event accumulator. It allows triggering based on
* event inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
public abstract class AbstractEventAccumulator implements EventAccumulator {
private Logger log = LoggerFactory.getLogger(AbstractEventAccumulator.class);
private final Timer timer;
private final int maxEvents;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
private List<Event> events = Lists.newArrayList();
public abstract class AbstractEventAccumulator
extends AbstractAccumulator<Event>
implements EventAccumulator {
/**
* Creates an event accumulator capable of triggering on the specified
......@@ -59,108 +42,6 @@ public abstract class AbstractEventAccumulator implements EventAccumulator {
*/
protected AbstractEventAccumulator(Timer timer, int maxEvents,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
this.maxEvents = maxEvents;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
public void add(Event event) {
idleTask = cancelIfActive(idleTask);
events.add(event);
// Did we hit the max event threshold?
if (events.size() == maxEvents) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
// Otherwise, schedule idle task and if this is a first event
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
if (events.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
}
// Schedules a new processor task given number of millis in the future.
private TimerTask schedule(int millis) {
TimerTask task = new ProcessorTask();
timer.schedule(task, millis);
return task;
}
// Cancels the specified task if it is active.
private TimerTask cancelIfActive(TimerTask task) {
if (task != null) {
task.cancel();
}
return task;
}
// Task for triggering processing of accumulated events
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processEvents(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e.getMessage());
}
}
}
// Demotes and returns the current batch of events and promotes a new one.
private synchronized List<Event> finalizeCurrentBatch() {
List<Event> toBeProcessed = events;
events = Lists.newArrayList();
return toBeProcessed;
}
/**
* Returns the backing timer.
*
* @return backing timer
*/
public Timer timer() {
return timer;
}
/**
* Returns the maximum number of events allowed to accumulate before
* processing is triggered.
*
* @return max number of events
*/
public int maxEvents() {
return maxEvents;
}
/**
* Returns the maximum number of millis allowed to expire since the first
* event before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
public int maxBatchMillis() {
return maxBatchMillis;
}
/**
* Returns the maximum number of millis allowed to expire since the last
* event arrival before processing is triggered.
*
* @return max number of millis since the last event
*/
public int maxIdleMillis() {
return maxIdleMillis;
super(timer, maxEvents, maxBatchMillis, maxIdleMillis);
}
}
......
......@@ -15,27 +15,11 @@
*/
package org.onosproject.event;
import java.util.List;
import org.onlab.util.Accumulator;
/**
* Abstraction of an accumulator capable of collecting events and at some
* point in time triggers processing of all previously accumulated events.
*/
public interface EventAccumulator {
/**
* Adds an event to the current batch. This operation may, or may not
* trigger processing of the current batch of events.
*
* @param event event to be added to the current batch
*/
void add(Event event);
/**
* Processes the specified list of accumulated events.
*
* @param events list of accumulated events
*/
void processEvents(List<Event> events);
public interface EventAccumulator extends Accumulator<Event> {
}
......
......@@ -36,7 +36,7 @@ public abstract class Intent {
private final IntentId id;
private final ApplicationId appId;
private final String key;
private final String key; // TODO make this a class
private final Collection<NetworkResource> resources;
......@@ -156,4 +156,8 @@ public abstract class Intent {
idGenerator = null;
}
}
public String key() {
return key;
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.intent;
import com.google.common.collect.ImmutableList;
import org.onosproject.store.Timestamp;
import java.util.List;
import java.util.Objects;
/**
* A wrapper class that contains an intents, its state, and other metadata for
* internal use.
*/
public class IntentData { //FIXME need to make this "immutable"
// manager should be able to mutate a local copy while processing
private Intent intent;
private IntentState state;
private Timestamp version;
private List<Intent> installables;
public IntentData(Intent intent, IntentState state, Timestamp version) {
this.intent = intent;
this.state = state;
this.version = version;
}
// kryo constructor
protected IntentData() {
}
public Intent intent() {
return intent;
}
public IntentState state() {
return state;
}
public String key() {
return intent.key();
}
public void setState(IntentState newState) {
this.state = newState;
}
public void setInstallables(List<Intent> installables) {
this.installables = ImmutableList.copyOf(installables);
}
public List<Intent> installables() {
return installables;
}
@Override
public int hashCode() {
return Objects.hash(intent, version);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final IntentData other = (IntentData) obj;
return Objects.equals(this.intent, other.intent)
&& Objects.equals(this.version, other.version);
}
}
......@@ -27,9 +27,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public final class IntentOperation {
private final Type type;
private final IntentId intentId;
private final Intent intent;
//FIXME consider pulling the key out (we will hash based on key)
/**
* Operation type.
......@@ -62,12 +60,10 @@ public final class IntentOperation {
* Creates an intent operation.
*
* @param type operation type
* @param intentId identifier of the intent subject to the operation
* @param intent intent subject
*/
IntentOperation(Type type, IntentId intentId, Intent intent) {
public IntentOperation(Type type, Intent intent) {
this.type = checkNotNull(type);
this.intentId = checkNotNull(intentId);
this.intent = intent;
}
......@@ -86,7 +82,11 @@ public final class IntentOperation {
* @return intent identifier
*/
public IntentId intentId() {
return intentId;
return intent.id();
}
public String key() {
return intent.key();
}
/**
......@@ -101,7 +101,7 @@ public final class IntentOperation {
@Override
public int hashCode() {
return Objects.hash(type, intentId, intent);
return Objects.hash(type, intent);
}
@Override
......@@ -114,7 +114,6 @@ public final class IntentOperation {
}
final IntentOperation other = (IntentOperation) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.intentId, other.intentId) &&
Objects.equals(this.intent, other.intent);
}
......@@ -123,7 +122,6 @@ public final class IntentOperation {
public String toString() {
return toStringHelper(this)
.add("type", type)
.add("intentId", intentId)
.add("intent", intent)
.toString();
}
......
......@@ -31,7 +31,7 @@ import static org.onosproject.net.intent.IntentOperation.Type.WITHDRAW;
/**
* Batch of intent submit/withdraw/replace operations.
*/
@Deprecated
@Deprecated //DELETEME
public final class IntentOperations {
private final List<IntentOperation> operations;
......@@ -120,7 +120,7 @@ public final class IntentOperations {
*/
public Builder addSubmitOperation(Intent intent) {
checkNotNull(intent, "Intent cannot be null");
builder.add(new IntentOperation(SUBMIT, intent.id(), intent));
builder.add(new IntentOperation(SUBMIT, intent));
return this;
}
......@@ -134,7 +134,7 @@ public final class IntentOperations {
public Builder addReplaceOperation(IntentId oldIntentId, Intent newIntent) {
checkNotNull(oldIntentId, "Intent ID cannot be null");
checkNotNull(newIntent, "Intent cannot be null");
builder.add(new IntentOperation(REPLACE, oldIntentId, newIntent));
builder.add(new IntentOperation(REPLACE, newIntent)); //FIXME
return this;
}
......@@ -146,7 +146,7 @@ public final class IntentOperations {
*/
public Builder addWithdrawOperation(IntentId intentId) {
checkNotNull(intentId, "Intent ID cannot be null");
builder.add(new IntentOperation(WITHDRAW, intentId, null));
builder.add(new IntentOperation(WITHDRAW, null)); //FIXME
return this;
}
......@@ -158,7 +158,7 @@ public final class IntentOperations {
*/
public Builder addUpdateOperation(IntentId intentId) {
checkNotNull(intentId, "Intent ID cannot be null");
builder.add(new IntentOperation(UPDATE, intentId, null));
builder.add(new IntentOperation(UPDATE, null)); //FIXME
return this;
}
......
......@@ -30,7 +30,7 @@ public enum IntentState {
* Intents will also pass through this state when they are updated.
* </p>
*/
INSTALL_REQ,
INSTALL_REQ, // TODO submit_REQ?
/**
* Signifies that the intent is being compiled into installable intents.
......@@ -66,7 +66,7 @@ public enum IntentState {
* previously failed to be installed.
* </p>
*/
RECOMPILING,
RECOMPILING, // TODO perhaps repurpose as BROKEN.
/**
* Indicates that an application has requested that an intent be withdrawn.
......@@ -92,5 +92,5 @@ public enum IntentState {
* Signifies that the intent has failed compiling, installing or
* recompiling states.
*/
FAILED
FAILED //TODO consider renaming to UNSAT.
}
......
......@@ -76,9 +76,9 @@ public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
/**
* Adds a new operation, which should be persisted and delegated.
*
* @param op operation
* @param intent operation
*/
default void add(IntentOperation op) {} //FIXME remove when impl.
default void addPending(IntentData intent) {} //FIXME remove when impl.
/**
* Checks to see whether the calling instance is the master for processing
......
......@@ -23,10 +23,10 @@ import org.onosproject.store.StoreDelegate;
public interface IntentStoreDelegate extends StoreDelegate<IntentEvent> {
/**
* Provides an intent operation that should be processed (compiled and
* Provides an intent data object that should be processed (compiled and
* installed) by this manager.
*
* @param op intent operation
* @param intentData intent data object
*/
void process(IntentOperation op);
void process(IntentData intentData);
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.intent.impl;
import com.google.common.collect.Maps;
import org.onlab.util.AbstractAccumulator;
import org.onosproject.net.intent.IntentData;
import java.util.List;
import java.util.Map;
import java.util.Timer;
/**
* An accumulator for building batches of intent operations. Only one batch should
* be in process per instance at a time.
*/
public class IntentAccumulator extends AbstractAccumulator<IntentData> {
private static final int DEFAULT_MAX_EVENTS = 1000;
private static final int DEFAULT_MAX_IDLE_MS = 10;
private static final int DEFAULT_MAX_BATCH_MS = 50;
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer("intent-op-batching");
/**
* Creates an intent operation accumulator.
*/
protected IntentAccumulator() {
super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
}
@Override
public void processEvents(List<IntentData> ops) {
Map<String, IntentData> opMap = reduce(ops);
// FIXME kick off the work
//for (IntentData data : opMap.values()) {}
}
private Map<String, IntentData> reduce(List<IntentData> ops) {
Map<String, IntentData> map = Maps.newHashMap();
for (IntentData op : ops) {
map.put(op.key(), op);
}
//TODO check the version... or maybe store will handle this.
return map;
}
}
......@@ -37,6 +37,7 @@ import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentBatchDelegate;
import org.onosproject.net.intent.IntentBatchService;
import org.onosproject.net.intent.IntentCompiler;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentException;
import org.onosproject.net.intent.IntentExtensionService;
......@@ -128,6 +129,8 @@ public class IntentManager
private final IntentBatchDelegate batchDelegate = new InternalBatchDelegate();
private IdGenerator idGenerator;
private final IntentAccumulator accumulator = new IntentAccumulator();
@Activate
public void activate() {
store.setDelegate(delegate);
......@@ -154,32 +157,41 @@ public class IntentManager
@Override
public void submit(Intent intent) {
checkNotNull(intent, INTENT_NULL);
execute(IntentOperations.builder(intent.appId())
.addSubmitOperation(intent).build());
IntentData data = new IntentData(intent, IntentState.INSTALL_REQ, null);
//FIXME timestamp?
store.addPending(data);
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
execute(IntentOperations.builder(intent.appId())
.addWithdrawOperation(intent.id()).build());
IntentData data = new IntentData(intent, IntentState.WITHDRAW_REQ, null);
//FIXME timestamp?
store.addPending(data);
}
@Override
public void replace(IntentId oldIntentId, Intent newIntent) {
checkNotNull(oldIntentId, INTENT_ID_NULL);
checkNotNull(newIntent, INTENT_NULL);
execute(IntentOperations.builder(newIntent.appId())
.addReplaceOperation(oldIntentId, newIntent)
.build());
throw new UnsupportedOperationException("replace is not implemented");
}
@Override
public void execute(IntentOperations operations) {
if (operations.operations().isEmpty()) {
return;
for (IntentOperation op : operations.operations()) {
switch (op.type()) {
case SUBMIT:
case UPDATE:
submit(op.intent());
break;
case WITHDRAW:
withdraw(op.intent());
break;
//fallthrough
case REPLACE:
default:
throw new UnsupportedOperationException("replace not supported");
}
}
batchService.addIntentOperations(operations);
}
@Override
......@@ -382,8 +394,8 @@ public class IntentManager
}
@Override
public void process(IntentOperation op) {
//FIXME
public void process(IntentData data) {
accumulator.add(data);
}
}
......@@ -488,6 +500,7 @@ public class IntentManager
}
}
// TODO pull out the IntentUpdate inner classes
private class InstallRequest implements IntentUpdate {
private final Intent intent;
......
......@@ -17,19 +17,19 @@ package org.onosproject.store.trivial.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.net.intent.BatchWrite;
import org.onosproject.net.intent.BatchWrite.Operation;
import org.onosproject.net.intent.Intent;
import org.onosproject.net.intent.IntentData;
import org.onosproject.net.intent.IntentEvent;
import org.onosproject.net.intent.IntentId;
import org.onosproject.net.intent.IntentState;
import org.onosproject.net.intent.IntentStore;
import org.onosproject.net.intent.IntentStoreDelegate;
import org.onosproject.net.intent.BatchWrite.Operation;
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
......@@ -38,8 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.*;
import static org.onosproject.net.intent.IntentState.WITHDRAWN;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -50,10 +49,13 @@ public class SimpleIntentStore
implements IntentStore {
private final Logger log = getLogger(getClass());
// current state maps FIXME.. make this a IntentData map
private final Map<IntentId, Intent> intents = new ConcurrentHashMap<>();
private final Map<IntentId, IntentState> states = new ConcurrentHashMap<>();
private final Map<IntentId, List<Intent>> installable = new ConcurrentHashMap<>();
private final Map<String, IntentData> pending = new ConcurrentHashMap<>(); //String is "key"
@Activate
public void activate() {
......@@ -203,4 +205,19 @@ public class SimpleIntentStore
}
return failed;
}
@Override
public void addPending(IntentData data) {
//FIXME need to compare versions
pending.put(data.key(), data);
checkNotNull(delegate, "Store delegate is not set")
.process(data);
}
// FIXME!!! pending.remove(intent.key()); // TODO check version
@Override
public boolean isMaster(Intent intent) {
return true;
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Base implementation of an event accumulator. It allows triggering based on
* event inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
// FIXME refactor the names here
public abstract class AbstractAccumulator<T> implements Accumulator<T> {
private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
private final Timer timer;
private final int maxEvents;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
private List<T> events = Lists.newArrayList();
/**
* Creates an event accumulator capable of triggering on the specified
* thresholds.
*
* @param timer timer to use for scheduling check-points
* @param maxEvents maximum number of events to accumulate before
* processing is triggered
* @param maxBatchMillis maximum number of millis allowed since the first
* event before processing is triggered
* @param maxIdleMillis maximum number millis between events before
* processing is triggered
*/
protected AbstractAccumulator(Timer timer, int maxEvents,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
this.maxEvents = maxEvents;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
public void add(T event) {
idleTask = cancelIfActive(idleTask);
events.add(event);
// Did we hit the max event threshold?
if (events.size() == maxEvents) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
// Otherwise, schedule idle task and if this is a first event
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
if (events.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
}
// Schedules a new processor task given number of millis in the future.
private TimerTask schedule(int millis) {
TimerTask task = new ProcessorTask();
timer.schedule(task, millis);
return task;
}
// Cancels the specified task if it is active.
private TimerTask cancelIfActive(TimerTask task) {
if (task != null) {
task.cancel();
}
return task;
}
// Task for triggering processing of accumulated events
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processEvents(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e.getMessage());
}
}
}
// Demotes and returns the current batch of events and promotes a new one.
private synchronized List<T> finalizeCurrentBatch() {
List<T> toBeProcessed = events;
events = Lists.newArrayList();
return toBeProcessed;
}
/**
* Returns the backing timer.
*
* @return backing timer
*/
public Timer timer() {
return timer;
}
/**
* Returns the maximum number of events allowed to accumulate before
* processing is triggered.
*
* @return max number of events
*/
public int maxEvents() {
return maxEvents;
}
/**
* Returns the maximum number of millis allowed to expire since the first
* event before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
public int maxBatchMillis() {
return maxBatchMillis;
}
/**
* Returns the maximum number of millis allowed to expire since the last
* event arrival before processing is triggered.
*
* @return max number of millis since the last event
*/
public int maxIdleMillis() {
return maxIdleMillis;
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import java.util.List;
/**
* Abstraction of an accumulator capable of collecting events and at some
* point in time triggers processing of all previously accumulated events.
*/
public interface Accumulator<T> {
/**
* Adds an event to the current batch. This operation may, or may not
* trigger processing of the current batch of events.
*
* @param event event to be added to the current batch
*/
void add(T event);
/**
* Processes the specified list of accumulated events.
*
* @param events list of accumulated events
*/
void processEvents(List<T> events);
}