tom

Working on simple topology manager and provider

package org.onlab.onos.event;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Base implementation of an event accumulator. It allows triggering based on
* event inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
public abstract class AbstractEventAccumulator implements EventAccumulator {
private final Timer timer;
private final int maxEvents;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
private List<Event> events = Lists.newArrayList();
/**
* Creates an event accumulator capable of triggering on the specified
* thresholds.
*
* @param timer timer to use for scheduling check-points
* @param maxEvents maximum number of events to accumulate before
* processing is triggered
* @param maxBatchMillis maximum number of millis allowed since the first
* event before processing is triggered
* @param maxIdleMillis maximum number millis between events before
* processing is triggered
*/
protected AbstractEventAccumulator(Timer timer, int maxEvents,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
this.maxEvents = maxEvents;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
public void add(Event event) {
idleTask = cancelIfActive(idleTask);
events.add(event);
// Did we hit the max event threshold?
if (events.size() == maxEvents) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
// Otherwise, schedule idle task and if this is a first event
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
if (events.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
}
// Schedules a new processor task given number of millis in the future.
private TimerTask schedule(int millis) {
TimerTask task = new ProcessorTask();
timer.schedule(task, millis);
return task;
}
// Cancels the specified task if it is active.
private TimerTask cancelIfActive(TimerTask task) {
if (task != null) {
task.cancel();
}
return task;
}
// Task for triggering processing of accumulated events
private class ProcessorTask extends TimerTask {
@Override
public void run() {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
processEvents(finalizeCurrentBatch());
}
}
// Demotes and returns the current batch of events and promotes a new one.
private synchronized List<Event> finalizeCurrentBatch() {
List<Event> toBeProcessed = events;
events = Lists.newArrayList();
return toBeProcessed;
}
/**
* Returns the backing timer.
*
* @return backing timer
*/
public Timer timer() {
return timer;
}
/**
* Returns the maximum number of events allowed to accumulate before
* processing is triggered.
*
* @return max number of events
*/
public int maxEvents() {
return maxEvents;
}
/**
* Returns the maximum number of millis allowed to expire since the first
* event before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
public int maxBatchMillis() {
return maxBatchMillis;
}
/**
* Returns the maximum number of millis allowed to expire since the last
* event arrival before processing is triggered.
*
* @return max number of millis since the last event
*/
public int maxIdleMillis() {
return maxIdleMillis;
}
}
package org.onlab.onos.event;
import java.util.List;
/**
* Abstraction of an accumulator capable of collecting events and at some
* point in time triggers processing of all previously accumulated events.
*/
public interface EventAccumulator {
/**
* Adds an event to the current batch. This operation may, or may not
* trigger processing of the current batch of events.
*
* @param event event to be added to the current batch
*/
void add(Event event);
/**
* Processes the specified list of accumulated events.
*
* @param events list of accumulated events
*/
void processEvents(List<Event> events);
}
package org.onlab.onos.net.trivial.impl;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onlab.graph.AdjacencyListsGraph;
import org.onlab.graph.DijkstraGraphSearch;
import org.onlab.graph.Graph;
import org.onlab.graph.GraphPathSearch;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.topology.ClusterId;
import org.onlab.onos.net.topology.LinkWeight;
import org.onlab.onos.net.topology.TopoEdge;
import org.onlab.onos.net.topology.TopoVertex;
import org.onlab.onos.net.topology.TopologyCluster;
import org.onlab.onos.net.topology.TopologyDescription;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import static com.google.common.base.MoreObjects.toStringHelper;
import static org.onlab.graph.GraphPathSearch.Result;
import static org.onlab.onos.net.Link.Type.INDIRECT;
/**
* Default implementation of an immutable topology data carrier.
*/
public class DefaultTopologyDescription implements TopologyDescription {
class DefaultTopologyDescription implements TopologyDescription {
private static final GraphPathSearch<TopoVertex, TopoEdge> DIJKSTRA =
new DijkstraGraphSearch<>();
private final long nanos;
private final Map<DeviceId, TopoVertex> vertexesById = Maps.newHashMap();
private final Graph<TopoVertex, TopoEdge> graph;
private final Map<DeviceId, GraphPathSearch.Result<TopoVertex, TopoEdge>> results;
private final Map<DeviceId, Result<TopoVertex, TopoEdge>> results;
private final Map<ClusterId, TopologyCluster> clusters;
private final Multimap<ClusterId, DeviceId> clusterDevices;
private final Multimap<ClusterId, Link> clusterLinks;
private final Map<DeviceId, TopologyCluster> deviceClusters;
public DefaultTopologyDescription(long nanos, Graph<TopoVertex, TopoEdge> graph,
Map<DeviceId, GraphPathSearch.Result<TopoVertex, TopoEdge>> results,
Map<ClusterId, TopologyCluster> clusters,
Multimap<ClusterId, DeviceId> clusterDevices,
Multimap<ClusterId, Link> clusterLinks,
Map<DeviceId, TopologyCluster> deviceClusters) {
// private final Multimap<ClusterId, DeviceId> clusterDevices;
// private final Multimap<ClusterId, Link> clusterLinks;
// private final Map<DeviceId, TopologyCluster> deviceClusters;
DefaultTopologyDescription(long nanos, Iterable<Device> devices, Iterable<Link> links) {
this.nanos = nanos;
this.graph = graph;
this.results = results;
this.clusters = clusters;
this.clusterDevices = clusterDevices;
this.clusterLinks = clusterLinks;
this.deviceClusters = deviceClusters;
this.graph = buildGraph(devices, links);
this.results = computeDefaultPaths();
this.clusters = computeClusters();
// this.clusterDevices = clusterDevices;
// this.clusterLinks = clusterLinks;
// this.deviceClusters = deviceClusters;
}
// Constructs the topology graph using the supplied devices and links.
private Graph<TopoVertex, TopoEdge> buildGraph(Iterable<Device> devices,
Iterable<Link> links) {
Graph<TopoVertex, TopoEdge> graph =
new AdjacencyListsGraph<>(buildVertexes(devices),
buildEdges(links));
return graph;
}
// Builds a set of topology vertexes from the specified list of devices
private Set<TopoVertex> buildVertexes(Iterable<Device> devices) {
Set<TopoVertex> vertexes = Sets.newHashSet();
for (Device device : devices) {
TopoVertex vertex = new TVertex(device.id());
vertexesById.put(vertex.deviceId(), vertex);
vertexes.add(vertex);
}
return vertexes;
}
// Builds a set of topology vertexes from the specified list of links
private Set<TopoEdge> buildEdges(Iterable<Link> links) {
Set<TopoEdge> edges = Sets.newHashSet();
for (Link link : links) {
edges.add(new TEdge(vertexOf(link.src()), vertexOf(link.dst()), link));
}
return edges;
}
// Computes the default shortest paths for all source/dest pairs using
// the multi-path Dijkstra and hop-count as path cost.
private Map<DeviceId, Result<TopoVertex, TopoEdge>> computeDefaultPaths() {
LinkWeight weight = new HopCountLinkWeight(graph.getVertexes().size());
Map<DeviceId, Result<TopoVertex, TopoEdge>> results = Maps.newHashMap();
// Search graph paths for each source to all destinations.
for (TopoVertex src : vertexesById.values()) {
results.put(src.deviceId(), DIJKSTRA.search(graph, src, null, weight));
}
return results;
}
// Computes topology SCC clusters using Tarjan algorithm.
private Map<ClusterId, TopologyCluster> computeClusters() {
Map<ClusterId, TopologyCluster> clusters = Maps.newHashMap();
return clusters;
}
// Fetches a vertex corresponding to the given connection point device.
private TopoVertex vertexOf(ConnectPoint connectPoint) {
DeviceId id = connectPoint.deviceId();
TopoVertex vertex = vertexesById.get(id);
if (vertex == null) {
// If vertex does not exist, create one and register it.
vertex = new TVertex(id);
vertexesById.put(id, vertex);
}
return vertex;
}
@Override
......@@ -54,7 +125,7 @@ public class DefaultTopologyDescription implements TopologyDescription {
}
@Override
public GraphPathSearch.Result<TopoVertex, TopoEdge> pathResults(DeviceId srcDeviceId) {
public Result<TopoVertex, TopoEdge> pathResults(DeviceId srcDeviceId) {
return results.get(srcDeviceId);
}
......@@ -75,6 +146,105 @@ public class DefaultTopologyDescription implements TopologyDescription {
@Override
public TopologyCluster clusterFor(DeviceId deviceId) {
return deviceClusters.get(deviceId);
return null; // deviceClusters.get(deviceId);
}
// Implementation of the topology vertex backed by a device id
private static class TVertex implements TopoVertex {
private final DeviceId deviceId;
public TVertex(DeviceId deviceId) {
this.deviceId = deviceId;
}
@Override
public DeviceId deviceId() {
return deviceId;
}
@Override
public int hashCode() {
return Objects.hash(deviceId);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TVertex) {
final TVertex other = (TVertex) obj;
return Objects.equals(this.deviceId, other.deviceId);
}
return false;
}
@Override
public String toString() {
return deviceId.toString();
}
}
// Implementation of the topology edge backed by a link
private class TEdge implements TopoEdge {
private final Link link;
private final TopoVertex src;
private final TopoVertex dst;
public TEdge(TopoVertex src, TopoVertex dst, Link link) {
this.src = src;
this.dst = dst;
this.link = link;
}
@Override
public Link link() {
return link;
}
@Override
public TopoVertex src() {
return src;
}
@Override
public TopoVertex dst() {
return dst;
}
@Override
public int hashCode() {
return Objects.hash(link);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TEdge) {
final TEdge other = (TEdge) obj;
return Objects.equals(this.link, other.link);
}
return false;
}
@Override
public String toString() {
return toStringHelper(this).add("src", src).add("dst", dst).toString();
}
}
// Link weight for measuring link cost as hop count with indirect links
// being as expensive as traversing the entire graph to assume the worst.
private class HopCountLinkWeight implements LinkWeight {
private final int indirectLinkCost;
public HopCountLinkWeight(int indirectLinkCost) {
this.indirectLinkCost = indirectLinkCost;
}
@Override
public double weight(TopoEdge edge) {
// To force preference to use direct paths first, make indirect
// links as expensive as the linear vertex traversal.
return edge.link().type() == INDIRECT ? indirectLinkCost : 1;
}
}
}
......
......@@ -28,7 +28,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
/**
* Manages inventory of infrastructure DEVICES using trivial in-memory
* structures implementation.
*/
class SimpleDeviceStore {
......
......@@ -25,7 +25,7 @@ import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
/**
* Manages inventory of infrastructure links using trivial in-memory link
* Manages inventory of infrastructure links using trivial in-memory structures
* implementation.
*/
class SimpleLinkStore {
......
......@@ -153,6 +153,9 @@ public class SimpleTopologyManager
public void topologyChanged(TopologyDescription topoDescription,
List<Event> reasons) {
checkNotNull(topoDescription, "Topology description cannot be null");
log.info("Topology changed due to: {}", // to be removed soon
reasons == null ? "initial compute" : reasons);
TopologyEvent event = store.updateTopology(topoDescription, reasons);
if (event != null) {
log.info("Topology changed due to: {}",
......
package org.onlab.onos.net.trivial.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.AbstractEventAccumulator;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.EventAccumulator;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkListener;
import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.net.topology.TopologyDescription;
import org.onlab.onos.net.topology.TopologyProvider;
import org.onlab.onos.net.topology.TopologyProviderRegistry;
import org.onlab.onos.net.topology.TopologyProviderService;
import org.slf4j.Logger;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Simple implementation of a network topology provider/computor.
*/
@Component(immediate = true)
public class SimpleTopologyProvider extends AbstractProvider
implements TopologyProvider {
// TODO: make these configurable
private static final int MAX_EVENTS = 100;
private static final int MAX_IDLE_MS = 50;
private static final int MAX_BATCH_MS = 200;
private static final int MAX_THREADS = 8;
// FIXME: Replace with a system-wide timer instance
private static final Timer TIMER = new Timer();
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
private volatile boolean isStarted = false;
private TopologyProviderService providerService;
private DeviceListener deviceListener = new InnerDeviceListener();
private LinkListener linkListener = new InnerLinkListener();
private EventAccumulator accumulator;
private ExecutorService executor;
/**
* Creates a provider with the supplier identifier.
*/
public SimpleTopologyProvider() {
super(new ProviderId("org.onlab.onos.provider.topology"));
}
@Activate
public synchronized void activate() {
executor = newFixedThreadPool(MAX_THREADS, namedThreads("topo-compute-%d"));
accumulator = new TopologyChangeAccumulator();
providerService = providerRegistry.register(this);
deviceService.addListener(deviceListener);
linkService.addListener(linkListener);
isStarted = true;
triggerTopologyBuild(null);
log.info("Started");
}
@Deactivate
public synchronized void deactivate() {
deviceService.removeListener(deviceListener);
linkService.removeListener(linkListener);
providerRegistry.unregister(this);
providerService = null;
executor.shutdownNow();
executor = null;
isStarted = false;
log.info("Stopped");
}
/**
* Triggers assembly of topology data citing the specified events as the
* reason.
*
* @param reasons events which triggered the topology change
*/
private void triggerTopologyBuild(List<Event> reasons) {
executor.execute(new TopologyBuilderTask(reasons));
}
// Builds the topology using the latest device and link information
// and citing the specified events as reasons for the change.
private void buildTopology(List<Event> reasons) {
log.info("YO! Computing topology");
if (isStarted) {
TopologyDescription desc =
new DefaultTopologyDescription(System.nanoTime(),
deviceService.getDevices(),
linkService.getLinks());
providerService.topologyChanged(desc, reasons);
}
}
// Callback for device events
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
if (type == DEVICE_ADDED || type == DEVICE_REMOVED ||
type == DEVICE_AVAILABILITY_CHANGED) {
accumulator.add(event);
}
}
}
// Callback for link events
private class InnerLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
accumulator.add(event);
}
}
// Event accumulator for paced triggering of topology assembly.
private class TopologyChangeAccumulator
extends AbstractEventAccumulator implements EventAccumulator {
TopologyChangeAccumulator() {
super(TIMER, MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
}
@Override
public void processEvents(List<Event> events) {
triggerTopologyBuild(events);
}
}
// Task for building topology data in a separate thread.
private class TopologyBuilderTask implements Runnable {
private final List<Event> reasons;
public TopologyBuilderTask(List<Event> reasons) {
this.reasons = reasons;
}
@Override
public void run() {
buildTopology(reasons);
}
}
}
......@@ -18,9 +18,9 @@ import java.util.Set;
/**
* Manages inventory of topology snapshots using trivial in-memory
* implementation.
* structures implementation.
*/
public class SimpleTopologyStore {
class SimpleTopologyStore {
private volatile DefaultTopology current;
......@@ -35,10 +35,12 @@ public class SimpleTopologyStore {
/**
* Indicates whether the topology is the latest.
*
* @param topology topology descriptor
* @return true if topology is the most recent one
*/
boolean isLatest(Topology topology) {
// Topology is current only if it is the same as our current topology
return topology == current;
}
......@@ -117,7 +119,8 @@ public class SimpleTopologyStore {
* @param reasons list of events that triggered the update
* @return topology update event or null if the description is old
*/
TopologyEvent updateTopology(TopologyDescription topoDescription, List<Event> reasons) {
TopologyEvent updateTopology(TopologyDescription topoDescription,
List<Event> reasons) {
return null;
}
......
......@@ -15,7 +15,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
* @param <V> vertex type
* @param <E> edge type
*/
public class AdjacencyListsGraph<V extends Vertex, E extends Edge<V>> implements Graph<V, E> {
public class AdjacencyListsGraph<V extends Vertex, E extends Edge<V>>
implements Graph<V, E> {
private final Set<V> vertexes;
private final Set<E> edges;
......