Thomas Vachuska

Added detection of traffic flowing using StatisticService.

Change-Id: I2044ec16fd722d953d0e2b2c955e4da2b1dab663
...@@ -25,7 +25,7 @@ import java.util.concurrent.Future; ...@@ -25,7 +25,7 @@ import java.util.concurrent.Future;
25 */ 25 */
26 public interface FlowRuleProvider extends Provider { 26 public interface FlowRuleProvider extends Provider {
27 27
28 - static final int POLL_INTERVAL = 5; 28 + static final int POLL_INTERVAL = 1;
29 29
30 /** 30 /**
31 * Instructs the provider to apply the specified flow rules to their 31 * Instructs the provider to apply the specified flow rules to their
......
1 +/*
2 + * Copyright 2014 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onlab.onos.store.trivial.impl;
17 +
18 +import com.google.common.collect.Sets;
19 +import org.apache.felix.scr.annotations.Activate;
20 +import org.apache.felix.scr.annotations.Component;
21 +import org.apache.felix.scr.annotations.Deactivate;
22 +import org.apache.felix.scr.annotations.Service;
23 +import org.onlab.onos.net.ConnectPoint;
24 +import org.onlab.onos.net.PortNumber;
25 +import org.onlab.onos.net.flow.FlowEntry;
26 +import org.onlab.onos.net.flow.FlowRule;
27 +import org.onlab.onos.net.flow.instructions.Instruction;
28 +import org.onlab.onos.net.flow.instructions.Instructions;
29 +import org.onlab.onos.net.statistic.StatisticStore;
30 +import org.slf4j.Logger;
31 +
32 +import java.util.HashSet;
33 +import java.util.Map;
34 +import java.util.Set;
35 +import java.util.concurrent.ConcurrentHashMap;
36 +import java.util.concurrent.atomic.AtomicInteger;
37 +
38 +import static org.slf4j.LoggerFactory.getLogger;
39 +
40 +
41 +/**
42 + * Maintains statistics using RPC calls to collect stats from remote instances
43 + * on demand.
44 + */
45 +@Component(immediate = true)
46 +@Service
47 +public class SimpleStatisticStore implements StatisticStore {
48 +
49 + private final Logger log = getLogger(getClass());
50 +
51 + private Map<ConnectPoint, InternalStatisticRepresentation>
52 + representations = new ConcurrentHashMap<>();
53 +
54 + private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap<>();
55 + private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap<>();
56 +
57 + @Activate
58 + public void activate() {
59 + log.info("Started");
60 + }
61 +
62 + @Deactivate
63 + public void deactivate() {
64 + log.info("Stopped");
65 + }
66 +
67 + @Override
68 + public void prepareForStatistics(FlowRule rule) {
69 + ConnectPoint cp = buildConnectPoint(rule);
70 + if (cp == null) {
71 + return;
72 + }
73 + InternalStatisticRepresentation rep;
74 + synchronized (representations) {
75 + rep = getOrCreateRepresentation(cp);
76 + }
77 + rep.prepare();
78 + }
79 +
80 + @Override
81 + public synchronized void removeFromStatistics(FlowRule rule) {
82 + ConnectPoint cp = buildConnectPoint(rule);
83 + if (cp == null) {
84 + return;
85 + }
86 + InternalStatisticRepresentation rep = representations.get(cp);
87 + if (rep != null) {
88 + rep.remove(rule);
89 + }
90 + Set<FlowEntry> values = current.get(cp);
91 + if (values != null) {
92 + values.remove(rule);
93 + }
94 + values = previous.get(cp);
95 + if (values != null) {
96 + values.remove(rule);
97 + }
98 +
99 + }
100 +
101 + @Override
102 + public void addOrUpdateStatistic(FlowEntry rule) {
103 + ConnectPoint cp = buildConnectPoint(rule);
104 + if (cp == null) {
105 + return;
106 + }
107 + InternalStatisticRepresentation rep = representations.get(cp);
108 + if (rep != null && rep.submit(rule)) {
109 + updatePublishedStats(cp, rep.get());
110 + }
111 + }
112 +
113 + private synchronized void updatePublishedStats(ConnectPoint cp,
114 + Set<FlowEntry> flowEntries) {
115 + Set<FlowEntry> curr = current.get(cp);
116 + if (curr == null) {
117 + curr = new HashSet<>();
118 + }
119 + previous.put(cp, curr);
120 + current.put(cp, flowEntries);
121 +
122 + }
123 +
124 + @Override
125 + public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
126 + return getCurrentStatisticInternal(connectPoint);
127 + }
128 +
129 + private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
130 + return current.get(connectPoint);
131 + }
132 +
133 + @Override
134 + public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
135 + return getPreviousStatisticInternal(connectPoint);
136 + }
137 +
138 + private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
139 + return previous.get(connectPoint);
140 + }
141 +
142 + private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
143 +
144 + if (representations.containsKey(cp)) {
145 + return representations.get(cp);
146 + } else {
147 + InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
148 + representations.put(cp, rep);
149 + return rep;
150 + }
151 +
152 + }
153 +
154 + private ConnectPoint buildConnectPoint(FlowRule rule) {
155 + PortNumber port = getOutput(rule);
156 + if (port == null) {
157 + log.warn("Rule {} has no output.", rule);
158 + return null;
159 + }
160 + ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
161 + return cp;
162 + }
163 +
164 + private PortNumber getOutput(FlowRule rule) {
165 + for (Instruction i : rule.treatment().instructions()) {
166 + if (i.type() == Instruction.Type.OUTPUT) {
167 + Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
168 + return out.port();
169 + }
170 + if (i.type() == Instruction.Type.DROP) {
171 + return PortNumber.P0;
172 + }
173 + }
174 + return null;
175 + }
176 +
177 + private class InternalStatisticRepresentation {
178 +
179 + private final AtomicInteger counter = new AtomicInteger(0);
180 + private final Set<FlowEntry> rules = new HashSet<>();
181 +
182 + public void prepare() {
183 + counter.incrementAndGet();
184 + }
185 +
186 + public synchronized void remove(FlowRule rule) {
187 + rules.remove(rule);
188 + counter.decrementAndGet();
189 + }
190 +
191 + public synchronized boolean submit(FlowEntry rule) {
192 + if (rules.contains(rule)) {
193 + rules.remove(rule);
194 + }
195 + rules.add(rule);
196 + if (counter.get() == 0) {
197 + return true;
198 + } else {
199 + return counter.decrementAndGet() == 0;
200 + }
201 + }
202 +
203 + public synchronized Set<FlowEntry> get() {
204 + counter.set(rules.size());
205 + return Sets.newHashSet(rules);
206 + }
207 +
208 + }
209 +
210 +}
...@@ -37,7 +37,7 @@ public class GuiWebSocketServlet extends WebSocketServlet { ...@@ -37,7 +37,7 @@ public class GuiWebSocketServlet extends WebSocketServlet {
37 37
38 private ServiceDirectory directory = new DefaultServiceDirectory(); 38 private ServiceDirectory directory = new DefaultServiceDirectory();
39 39
40 - private final Set<TopologyWebSocket> sockets = new HashSet<>(); 40 + private final Set<TopologyViewWebSocket> sockets = new HashSet<>();
41 private final Timer timer = new Timer(); 41 private final Timer timer = new Timer();
42 private final TimerTask pruner = new Pruner(); 42 private final TimerTask pruner = new Pruner();
43 43
...@@ -49,7 +49,7 @@ public class GuiWebSocketServlet extends WebSocketServlet { ...@@ -49,7 +49,7 @@ public class GuiWebSocketServlet extends WebSocketServlet {
49 49
50 @Override 50 @Override
51 public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) { 51 public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
52 - TopologyWebSocket socket = new TopologyWebSocket(directory); 52 + TopologyViewWebSocket socket = new TopologyViewWebSocket(directory);
53 synchronized (sockets) { 53 synchronized (sockets) {
54 sockets.add(socket); 54 sockets.add(socket);
55 } 55 }
...@@ -61,9 +61,9 @@ public class GuiWebSocketServlet extends WebSocketServlet { ...@@ -61,9 +61,9 @@ public class GuiWebSocketServlet extends WebSocketServlet {
61 @Override 61 @Override
62 public void run() { 62 public void run() {
63 synchronized (sockets) { 63 synchronized (sockets) {
64 - Iterator<TopologyWebSocket> it = sockets.iterator(); 64 + Iterator<TopologyViewWebSocket> it = sockets.iterator();
65 while (it.hasNext()) { 65 while (it.hasNext()) {
66 - TopologyWebSocket socket = it.next(); 66 + TopologyViewWebSocket socket = it.next();
67 if (socket.isIdle()) { 67 if (socket.isIdle()) {
68 it.remove(); 68 it.remove();
69 socket.close(); 69 socket.close();
......
...@@ -49,6 +49,8 @@ import org.onlab.onos.net.intent.PathIntent; ...@@ -49,6 +49,8 @@ import org.onlab.onos.net.intent.PathIntent;
49 import org.onlab.onos.net.link.LinkEvent; 49 import org.onlab.onos.net.link.LinkEvent;
50 import org.onlab.onos.net.link.LinkService; 50 import org.onlab.onos.net.link.LinkService;
51 import org.onlab.onos.net.provider.ProviderId; 51 import org.onlab.onos.net.provider.ProviderId;
52 +import org.onlab.onos.net.statistic.Load;
53 +import org.onlab.onos.net.statistic.StatisticService;
52 import org.onlab.osgi.ServiceDirectory; 54 import org.onlab.osgi.ServiceDirectory;
53 import org.onlab.packet.IpAddress; 55 import org.onlab.packet.IpAddress;
54 import org.slf4j.Logger; 56 import org.slf4j.Logger;
...@@ -75,9 +77,9 @@ import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED; ...@@ -75,9 +77,9 @@ import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
75 /** 77 /**
76 * Facility for creating messages bound for the topology viewer. 78 * Facility for creating messages bound for the topology viewer.
77 */ 79 */
78 -public abstract class TopologyMessages { 80 +public abstract class TopologyViewMessages {
79 81
80 - protected static final Logger log = LoggerFactory.getLogger(TopologyMessages.class); 82 + protected static final Logger log = LoggerFactory.getLogger(TopologyViewMessages.class);
81 83
82 private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core", true); 84 private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core", true);
83 private static final String COMPACT = "%s/%s-%s/%s"; 85 private static final String COMPACT = "%s/%s-%s/%s";
...@@ -89,7 +91,7 @@ public abstract class TopologyMessages { ...@@ -89,7 +91,7 @@ public abstract class TopologyMessages {
89 protected final HostService hostService; 91 protected final HostService hostService;
90 protected final MastershipService mastershipService; 92 protected final MastershipService mastershipService;
91 protected final IntentService intentService; 93 protected final IntentService intentService;
92 -// protected final StatisticService statService; 94 + protected final StatisticService statService;
93 95
94 protected final ObjectMapper mapper = new ObjectMapper(); 96 protected final ObjectMapper mapper = new ObjectMapper();
95 97
...@@ -101,7 +103,7 @@ public abstract class TopologyMessages { ...@@ -101,7 +103,7 @@ public abstract class TopologyMessages {
101 * 103 *
102 * @param directory service directory 104 * @param directory service directory
103 */ 105 */
104 - protected TopologyMessages(ServiceDirectory directory) { 106 + protected TopologyViewMessages(ServiceDirectory directory) {
105 this.directory = checkNotNull(directory, "Directory cannot be null"); 107 this.directory = checkNotNull(directory, "Directory cannot be null");
106 clusterService = directory.get(ClusterService.class); 108 clusterService = directory.get(ClusterService.class);
107 deviceService = directory.get(DeviceService.class); 109 deviceService = directory.get(DeviceService.class);
...@@ -109,7 +111,7 @@ public abstract class TopologyMessages { ...@@ -109,7 +111,7 @@ public abstract class TopologyMessages {
109 hostService = directory.get(HostService.class); 111 hostService = directory.get(HostService.class);
110 mastershipService = directory.get(MastershipService.class); 112 mastershipService = directory.get(MastershipService.class);
111 intentService = directory.get(IntentService.class); 113 intentService = directory.get(IntentService.class);
112 -// statService = directory.get(StatisticService.class); 114 + statService = directory.get(StatisticService.class);
113 } 115 }
114 116
115 // Retrieves the payload from the specified event. 117 // Retrieves the payload from the specified event.
...@@ -408,14 +410,15 @@ public abstract class TopologyMessages { ...@@ -408,14 +410,15 @@ public abstract class TopologyMessages {
408 410
409 if (links != null) { 411 if (links != null) {
410 ArrayNode labels = mapper.createArrayNode(); 412 ArrayNode labels = mapper.createArrayNode();
411 - boolean hasTraffic = true; // FIXME 413 + boolean hasTraffic = false;
412 for (Link link : links) { 414 for (Link link : links) {
413 linksNode.add(compactLinkString(link)); 415 linksNode.add(compactLinkString(link));
414 -// Load load = statService.load(link); 416 + Load load = statService.load(link);
415 String label = ""; 417 String label = "";
416 -// if (load.rate() > 0) { 418 + if (load.rate() > 0) {
417 -// label = load.toString(); 419 + hasTraffic = true;
418 -// } 420 + label = load.toString();
421 + }
419 labels.add(label); 422 labels.add(label);
420 } 423 }
421 pathNode.put("class", hasTraffic ? type + " animated" : type); 424 pathNode.put("class", hasTraffic ? type + " animated" : type);
......
...@@ -66,8 +66,8 @@ import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED; ...@@ -66,8 +66,8 @@ import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
66 /** 66 /**
67 * Web socket capable of interacting with the GUI topology view. 67 * Web socket capable of interacting with the GUI topology view.
68 */ 68 */
69 -public class TopologyWebSocket 69 +public class TopologyViewWebSocket
70 - extends TopologyMessages 70 + extends TopologyViewMessages
71 implements WebSocket.OnTextMessage, WebSocket.OnControl { 71 implements WebSocket.OnTextMessage, WebSocket.OnControl {
72 72
73 private static final long MAX_AGE_MS = 15000; 73 private static final long MAX_AGE_MS = 15000;
...@@ -78,7 +78,7 @@ public class TopologyWebSocket ...@@ -78,7 +78,7 @@ public class TopologyWebSocket
78 78
79 private static final String APP_ID = "org.onlab.onos.gui"; 79 private static final String APP_ID = "org.onlab.onos.gui";
80 80
81 - private static final long TRAFFIC_FREQUENCY_SEC = 5000; 81 + private static final long TRAFFIC_FREQUENCY_SEC = 1000;
82 82
83 private final ApplicationId appId; 83 private final ApplicationId appId;
84 84
...@@ -104,7 +104,7 @@ public class TopologyWebSocket ...@@ -104,7 +104,7 @@ public class TopologyWebSocket
104 * 104 *
105 * @param directory service directory 105 * @param directory service directory
106 */ 106 */
107 - public TopologyWebSocket(ServiceDirectory directory) { 107 + public TopologyViewWebSocket(ServiceDirectory directory) {
108 super(directory); 108 super(directory);
109 appId = directory.get(CoreService.class).registerApplication(APP_ID); 109 appId = directory.get(CoreService.class).registerApplication(APP_ID);
110 } 110 }
......