Thomas Vachuska
Committed by Gerrit Code Review

GUI -- Added port-statistics traffic visualization to the topo view.

Change-Id: I52b3c1739cc50a026c0796819d61ec1898937ced
......@@ -26,13 +26,14 @@ public class DefaultLoad implements Load {
private final long current;
private final long previous;
private final long time;
private final int interval;
/**
* Indicates the flow statistics poll interval in seconds.
*/
private static int pollInterval = 10;
/**
/**
* Creates an invalid load.
*/
public DefaultLoad() {
......@@ -40,18 +41,32 @@ public class DefaultLoad implements Load {
this.time = System.currentTimeMillis();
this.current = -1;
this.previous = -1;
this.interval = pollInterval;
}
/**
* Creates a load value from the parameters.
* @param current the current value
*
* @param current the current value
* @param previous the previous value
*/
public DefaultLoad(long current, long previous) {
this(current, previous, pollInterval);
}
/**
* Creates a load value from the parameters.
*
* @param current the current value
* @param previous the previous value
* @param interval poll interval for this load
*/
public DefaultLoad(long current, long previous, int interval) {
this.current = current;
this.previous = previous;
this.time = System.currentTimeMillis();
this.isValid = true;
this.interval = interval;
}
/**
......@@ -66,7 +81,7 @@ public class DefaultLoad implements Load {
@Override
public long rate() {
return (current - previous) / pollInterval;
return (current - previous) / interval;
}
@Override
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.statistic.Load;
/**
* Service for obtaining statistic information about device ports.
*/
public interface PortStatisticsService {
/**
* Obtain the egress load for the given port.
*
* @param connectPoint the port to query
* @return egress traffic load
*/
Load load(ConnectPoint connectPoint);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.impl;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.incubator.net.PortStatisticsService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.statistic.DefaultLoad;
import org.onosproject.net.statistic.Load;
import org.slf4j.Logger;
import java.util.Map;
import java.util.stream.Collectors;
import static org.onosproject.net.PortNumber.portNumber;
import static org.onosproject.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of the port statistics service.
*/
@Component(immediate = true)
@Service
public class PortStatisticsManager implements PortStatisticsService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final DeviceListener deviceListener = new InternalDeviceListener();
private Map<ConnectPoint, DataPoint> current = Maps.newConcurrentMap();
private Map<ConnectPoint, DataPoint> previous = Maps.newConcurrentMap();
@Activate
public void activate() {
deviceService.addListener(deviceListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
deviceService.removeListener(deviceListener);
log.info("Stopped");
}
@Override
public Load load(ConnectPoint connectPoint) {
DataPoint c = current.get(connectPoint);
DataPoint p = previous.get(connectPoint);
if (c != null && p != null) {
return new DefaultLoad(c.stats.bytesSent(), p.stats.bytesSent(),
(int) (c.time - p.time) / 1000);
}
return null;
}
// Monitors port stats update messages.
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
DeviceEvent.Type type = event.type();
DeviceId deviceId = event.subject().id();
if (type == PORT_STATS_UPDATED) {
// Update port load
updateDeviceData(deviceId);
} else if (type == DEVICE_REMOVED ||
(type == DEVICE_AVAILABILITY_CHANGED &&
!deviceService.isAvailable(deviceId))) {
// Clean-up all port loads
pruneDeviceData(deviceId);
}
}
}
// Updates the port stats for the specified device
private void updateDeviceData(DeviceId deviceId) {
deviceService.getPortStatistics(deviceId)
.forEach(stats -> updatePortData(deviceId, stats));
}
// Updates the port stats for the specified port
private void updatePortData(DeviceId deviceId, PortStatistics stats) {
ConnectPoint cp = new ConnectPoint(deviceId, portNumber(stats.port()));
// If we have a current data point, demote it to previous
DataPoint c = current.get(cp);
if (c != null) {
previous.put(cp, c);
}
// Create a new data point and make it the current one
current.put(cp, new DataPoint(stats));
}
// Cleans all port loads for the specified device
private void pruneDeviceData(DeviceId deviceId) {
pruneMap(current, deviceId);
pruneMap(previous, deviceId);
}
private void pruneMap(Map<ConnectPoint, DataPoint> map, DeviceId deviceId) {
map.keySet().stream().filter(cp -> deviceId.equals(cp.deviceId()))
.collect(Collectors.toSet()).forEach(map::remove);
}
// Auxiliary data point to track when we receive different samples.
private class DataPoint {
long time;
PortStatistics stats;
DataPoint(PortStatistics stats) {
time = System.currentTimeMillis();
this.stats = stats;
}
}
}
......@@ -15,14 +15,15 @@
*/
package org.onosproject.provider.of.device.impl;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.ChassisId;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
......@@ -50,7 +51,6 @@ import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.OpenFlowSwitchListener;
import org.onosproject.openflow.controller.PortDescPropertyType;
import org.onosproject.openflow.controller.RoleState;
import org.onlab.packet.ChassisId;
import org.projectfloodlight.openflow.protocol.OFFactory;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPortConfig;
......@@ -75,8 +75,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import com.google.common.base.Strings;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.Port.Type.COPPER;
import static org.onosproject.net.Port.Type.FIBER;
......@@ -204,36 +202,37 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
private void pushPortMetrics(Dpid dpid, OFPortStatsReply msg) {
DeviceId deviceId = DeviceId.deviceId(dpid.uri(dpid));
Collection<PortStatistics> stats = buildPortStatistics(deviceId, msg);
providerService.updatePortStatistics(deviceId, stats);
}
private Collection<PortStatistics> buildPortStatistics(DeviceId deviceId, OFPortStatsReply msg) {
HashSet<PortStatistics> stats = Sets.newHashSet();
for (OFPortStatsEntry entry: msg.getEntries()) {
if (entry.getPortNo().getPortNumber() < 0) {
continue;
try {
if (entry.getPortNo().getPortNumber() < 0) {
continue;
}
DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
DefaultPortStatistics stat = builder.setDeviceId(deviceId)
.setPort(entry.getPortNo().getPortNumber())
.setPacketsReceived(entry.getRxPackets().getValue())
.setPacketsSent(entry.getTxPackets().getValue())
.setBytesReceived(entry.getRxBytes().getValue())
.setBytesSent(entry.getTxBytes().getValue())
.setPacketsRxDropped(entry.getRxDropped().getValue())
.setPacketsTxDropped(entry.getTxDropped().getValue())
.setPacketsRxErrors(entry.getRxErrors().getValue())
.setPacketsTxErrors(entry.getTxErrors().getValue())
.setDurationSec(entry.getVersion() == OFVersion.OF_10 ? 0 : entry.getDurationSec())
.setDurationNano(entry.getVersion() == OFVersion.OF_10 ? 0 : entry.getDurationNsec())
.build();
stats.add(stat);
} catch (Exception e) {
LOG.warn("Unable to process port stats", e);
}
DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
DefaultPortStatistics stat = builder.setDeviceId(deviceId)
.setPort(entry.getPortNo().getPortNumber())
.setPacketsReceived(entry.getRxPackets().getValue())
.setPacketsSent(entry.getTxPackets().getValue())
.setBytesReceived(entry.getRxBytes().getValue())
.setBytesSent(entry.getTxBytes().getValue())
.setPacketsRxDropped(entry.getRxDropped().getValue())
.setPacketsTxDropped(entry.getTxDropped().getValue())
.setPacketsRxErrors(entry.getRxErrors().getValue())
.setPacketsTxErrors(entry.getTxErrors().getValue())
.setDurationSec(entry.getDurationSec())
.setDurationNano(entry.getDurationNsec())
.build();
stats.add(stat);
}
return Collections.unmodifiableSet(stats);
......
......@@ -92,7 +92,8 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
private static final String REQ_NEXT_INTENT = "requestNextRelatedIntent";
private static final String REQ_PREV_INTENT = "requestPrevRelatedIntent";
private static final String REQ_SEL_INTENT_TRAFFIC = "requestSelectedIntentTraffic";
private static final String REQ_ALL_TRAFFIC = "requestAllTraffic";
private static final String REQ_ALL_FLOW_TRAFFIC = "requestAllFlowTraffic";
private static final String REQ_ALL_PORT_TRAFFIC = "requestAllPortTraffic";
private static final String REQ_DEV_LINK_FLOWS = "requestDeviceLinkFlows";
private static final String CANCEL_TRAFFIC = "cancelTraffic";
private static final String REQ_SUMMARY = "requestSummary";
......@@ -187,7 +188,8 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
new ReqNextIntent(),
new ReqPrevIntent(),
new ReqSelectedIntentTraffic(),
new ReqAllTraffic(),
new ReqAllFlowTraffic(),
new ReqAllPortTraffic(),
new ReqDevLinkFlows(),
new CancelTraffic()
);
......@@ -453,23 +455,33 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
@Override
public void process(long sid, ObjectNode payload) {
trafficEvent =
new TrafficEvent(TrafficEvent.Type.SEL_INTENT, payload);
trafficEvent = new TrafficEvent(TrafficEvent.Type.SEL_INTENT, payload);
requestSelectedIntentTraffic();
startTrafficMonitoring();
}
}
private final class ReqAllTraffic extends RequestHandler {
private ReqAllTraffic() {
super(REQ_ALL_TRAFFIC);
private final class ReqAllFlowTraffic extends RequestHandler {
private ReqAllFlowTraffic() {
super(REQ_ALL_FLOW_TRAFFIC);
}
@Override
public void process(long sid, ObjectNode payload) {
trafficEvent =
new TrafficEvent(TrafficEvent.Type.ALL_TRAFFIC, payload);
requestAllTraffic();
trafficEvent = new TrafficEvent(TrafficEvent.Type.ALL_FLOW_TRAFFIC, payload);
requestAllFlowTraffic();
}
}
private final class ReqAllPortTraffic extends RequestHandler {
private ReqAllPortTraffic() {
super(REQ_ALL_PORT_TRAFFIC);
}
@Override
public void process(long sid, ObjectNode payload) {
trafficEvent = new TrafficEvent(TrafficEvent.Type.ALL_PORT_TRAFFIC, payload);
requestAllPortTraffic();
}
}
......@@ -480,8 +492,7 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
@Override
public void process(long sid, ObjectNode payload) {
trafficEvent =
new TrafficEvent(TrafficEvent.Type.DEV_LINK_FLOWS, payload);
trafficEvent = new TrafficEvent(TrafficEvent.Type.DEV_LINK_FLOWS, payload);
requestDeviceLinkFlows(payload);
}
}
......@@ -615,10 +626,16 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
}
}
// Subscribes for host traffic messages.
private synchronized void requestAllTraffic() {
// Subscribes for flow traffic messages.
private synchronized void requestAllFlowTraffic() {
startTrafficMonitoring();
sendMessage(trafficSummaryMessage());
sendMessage(trafficSummaryMessage(StatsType.FLOW));
}
// Subscribes for port traffic messages.
private synchronized void requestAllPortTraffic() {
startTrafficMonitoring();
sendMessage(trafficSummaryMessage(StatsType.PORT));
}
private void requestDeviceLinkFlows(ObjectNode payload) {
......@@ -822,7 +839,7 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
// encapsulate
private static class TrafficEvent {
enum Type {
ALL_TRAFFIC, DEV_LINK_FLOWS, SEL_INTENT
ALL_FLOW_TRAFFIC, ALL_PORT_TRAFFIC, DEV_LINK_FLOWS, SEL_INTENT
}
private final Type type;
......@@ -841,8 +858,11 @@ public class TopologyViewMessageHandler extends TopologyViewMessageHandlerBase {
try {
if (trafficEvent != null) {
switch (trafficEvent.type) {
case ALL_TRAFFIC:
requestAllTraffic();
case ALL_FLOW_TRAFFIC:
requestAllFlowTraffic();
break;
case ALL_PORT_TRAFFIC:
requestAllPortTraffic();
break;
case DEV_LINK_FLOWS:
requestDeviceLinkFlows(trafficEvent.payload);
......
......@@ -27,6 +27,7 @@ import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.CoreService;
import org.onosproject.incubator.net.PortStatisticsService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Annotated;
import org.onosproject.net.AnnotationKeys;
......@@ -100,6 +101,7 @@ import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onosproject.ui.impl.TopologyViewMessageHandlerBase.StatsType.*;
/**
* Facility for creating messages bound for the topology viewer.
......@@ -123,6 +125,8 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
private static final String KB_UNIT = "KB";
private static final String B_UNIT = "B";
private static final long BPS_THRESHOLD = 1024;
protected ServiceDirectory directory;
protected ClusterService clusterService;
protected DeviceService deviceService;
......@@ -131,9 +135,14 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
protected MastershipService mastershipService;
protected IntentService intentService;
protected FlowRuleService flowService;
protected StatisticService statService;
protected StatisticService flowStatsService;
protected PortStatisticsService portStatsService;
protected TopologyService topologyService;
protected enum StatsType {
FLOW, PORT
}
private String version;
// TODO: extract into an external & durable state; good enough for now and demo
......@@ -159,7 +168,8 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
mastershipService = directory.get(MastershipService.class);
intentService = directory.get(IntentService.class);
flowService = directory.get(FlowRuleService.class);
statService = directory.get(StatisticService.class);
flowStatsService = directory.get(StatisticService.class);
portStatsService = directory.get(PortStatisticsService.class);
topologyService = directory.get(TopologyService.class);
String ver = directory.get(CoreService.class).version().toString();
......@@ -532,8 +542,8 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
}
// Produces JSON message to trigger traffic overview visualization
protected ObjectNode trafficSummaryMessage() {
// Produces JSON message to trigger flow traffic overview visualization
protected ObjectNode trafficSummaryMessage(StatsType type) {
ObjectNode payload = objectNode();
ArrayNode paths = arrayNode();
payload.set("paths", paths);
......@@ -560,11 +570,18 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
boolean bi = link.two != null;
if (isInfrastructureEgress(link.one) ||
(bi && isInfrastructureEgress(link.two))) {
link.addLoad(statService.load(link.one));
link.addLoad(bi ? statService.load(link.two) : null);
if (type == FLOW) {
link.addLoad(flowStatsService.load(link.one));
link.addLoad(bi ? flowStatsService.load(link.two) : null);
} else if (type == PORT) {
link.addLoad(portStatsService.load(link.one.src()), BPS_THRESHOLD);
link.addLoad(bi ? portStatsService.load(link.two.src()) : null, BPS_THRESHOLD);
}
if (link.hasTraffic) {
linksNodeT.add(compactLinkString(link.one));
labelsT.add(formatBytes(link.bytes));
labelsT.add(type == PORT ?
formatBytes(link.rate) + "ps" :
formatBytes(link.bytes));
} else {
linksNodeN.add(compactLinkString(link.one));
labelsN.add("");
......@@ -692,7 +709,7 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
BiLink biLink = addLink(biLinks, link);
if (isInfrastructureEgress(link)) {
if (showTraffic) {
biLink.addLoad(statService.load(link));
biLink.addLoad(flowStatsService.load(link));
}
biLink.addClass(type);
}
......@@ -727,7 +744,7 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
for (Link link : links) {
if (isInfrastructureEgress(link)) {
linksNode.add(compactLinkString(link));
Load load = statService.load(link);
Load load = flowStatsService.load(link);
String label = "";
if (load.rate() > 0) {
hasTraffic = true;
......@@ -814,6 +831,7 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
public long bytes = 0;
private Set<String> classes = new HashSet<>();
private long rate;
BiLink(LinkKey key, Link link) {
this.key = key;
......@@ -825,9 +843,14 @@ public abstract class TopologyViewMessageHandlerBase extends UiMessageHandler {
}
void addLoad(Load load) {
addLoad(load, 0);
}
void addLoad(Load load, long threshold) {
if (load != null) {
this.hasTraffic = hasTraffic || load.rate() > 0;
this.hasTraffic = hasTraffic || load.rate() > threshold;
this.bytes += load.latest();
this.rate = load.rate();
}
}
......
......@@ -67,7 +67,8 @@
rightArrow: [tts.showNextIntentAction, 'Show next related intent'],
leftArrow: [tts.showPrevIntentAction, 'Show previous related intent'],
W: [tts.showSelectedIntentTrafficAction, 'Monitor traffic of selected intent'],
A: [tts.showAllTrafficAction, 'Monitor all traffic'],
A: [tts.showAllFlowTrafficAction, 'Monitor all traffic using flow stats'],
Q: [tts.showAllPortTrafficAction, 'Monitor all traffic using port stats'],
F: [tts.showDeviceLinkFlowsAction, 'Show device link flows'],
E: [equalizeMasters, 'Equalize mastership roles'],
......
......@@ -150,10 +150,17 @@
}
// keystroke-A (see topo.js)
function showAllTrafficAction() {
function showAllFlowTrafficAction() {
hoverMode = hoverModeAll;
wss.sendEvent('requestAllTraffic');
flash.flash('All Traffic');
wss.sendEvent('requestAllFlowTraffic');
flash.flash('All Flow Traffic');
}
// keystroke-A (see topo.js)
function showAllPortTrafficAction() {
hoverMode = hoverModeAll;
wss.sendEvent('requestAllPortTraffic');
flash.flash('All Port Traffic');
}
// === -----------------------------
......@@ -228,7 +235,8 @@
showNextIntentAction: showNextIntentAction,
showPrevIntentAction: showPrevIntentAction,
showSelectedIntentTrafficAction: showSelectedIntentTrafficAction,
showAllTrafficAction: showAllTrafficAction
showAllFlowTrafficAction: showAllFlowTrafficAction,
showAllPortTrafficAction: showAllPortTrafficAction
};
}]);
}());
......
......@@ -48,6 +48,11 @@
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-incubator-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${project.version}</version>
</dependency>
......