Thomas Vachuska
Committed by Gerrit Code Review

ONOS-2846, ONOS-2812 Reworking link discovery provider to be fully configurable …

…and to prune links based on links rather than based on ports.

Change-Id: I0a042403baf163bebd471bffd112b28571dae75b
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.provider.lldp.impl;
17 +
18 +import org.onosproject.mastership.MastershipService;
19 +import org.onosproject.net.link.LinkProviderService;
20 +import org.onosproject.net.packet.PacketService;
21 +
22 +/**
23 + * Shared context for use by link discovery.
24 + */
25 +public interface DiscoveryContext {
26 +
27 + /**
28 + * Returns the shared mastership service reference.
29 + *
30 + * @return mastership service
31 + */
32 + MastershipService mastershipService();
33 +
34 + /**
35 + * Returns the shared link provider service reference.
36 + *
37 + * @return link provider service
38 + */
39 + LinkProviderService providerService();
40 +
41 + /**
42 + * Returns the shared packet service reference.
43 + *
44 + * @return packet service
45 + */
46 + PacketService packetService();
47 +
48 + /**
49 + * Returns the probe rate in millis.
50 + *
51 + * @return probe rate
52 + */
53 + long probeRate();
54 +
55 + /**
56 + * Returns the max stale link age in millis.
57 + *
58 + * @return stale link age
59 + */
60 + long staleLinkAge();
61 +
62 + /**
63 + * Indicates whether to emit BDDP.
64 + *
65 + * @return true to emit BDDP
66 + */
67 + boolean useBDDP();
68 +}
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
15 */ 15 */
16 package org.onosproject.provider.lldp.impl; 16 package org.onosproject.provider.lldp.impl;
17 17
18 -import com.google.common.base.Strings;
19 import com.google.common.collect.ImmutableMap; 18 import com.google.common.collect.ImmutableMap;
20 import com.google.common.collect.ImmutableSet; 19 import com.google.common.collect.ImmutableSet;
21 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
...@@ -57,9 +56,11 @@ import java.io.IOException; ...@@ -57,9 +56,11 @@ import java.io.IOException;
57 import java.util.Dictionary; 56 import java.util.Dictionary;
58 import java.util.EnumSet; 57 import java.util.EnumSet;
59 import java.util.Map; 58 import java.util.Map;
59 +import java.util.Properties;
60 import java.util.concurrent.ConcurrentHashMap; 60 import java.util.concurrent.ConcurrentHashMap;
61 import java.util.concurrent.ScheduledExecutorService; 61 import java.util.concurrent.ScheduledExecutorService;
62 62
63 +import static com.google.common.base.Strings.isNullOrEmpty;
63 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 64 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
64 import static java.util.concurrent.TimeUnit.SECONDS; 65 import static java.util.concurrent.TimeUnit.SECONDS;
65 import static org.onlab.util.Tools.get; 66 import static org.onlab.util.Tools.get;
...@@ -67,19 +68,16 @@ import static org.onlab.util.Tools.groupedThreads; ...@@ -67,19 +68,16 @@ import static org.onlab.util.Tools.groupedThreads;
67 import static org.slf4j.LoggerFactory.getLogger; 68 import static org.slf4j.LoggerFactory.getLogger;
68 69
69 /** 70 /**
70 - * Provider which uses an OpenFlow controller to detect network 71 + * Provider which uses LLDP and BDDP packets to detect network infrastructure links.
71 - * infrastructure links.
72 */ 72 */
73 @Component(immediate = true) 73 @Component(immediate = true)
74 public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { 74 public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
75 75
76 private static final String PROVIDER_NAME = "org.onosproject.provider.lldp"; 76 private static final String PROVIDER_NAME = "org.onosproject.provider.lldp";
77 77
78 - private static final String PROP_USE_BDDP = "useBDDP"; 78 + private static final String FORMAT =
79 - private static final String PROP_DISABLE_LD = "disableLinkDiscovery"; 79 + "Settings: enabled={}, useBDDP={}, probeRate={}, " +
80 - private static final String PROP_LLDP_SUPPRESSION = "lldpSuppression"; 80 + "staleLinkAge={}, lldpSuppression={}";
81 -
82 - private static final String DEFAULT_LLDP_SUPPRESSION_CONFIG = "../config/lldp_suppression.json";
83 81
84 private final Logger log = getLogger(getClass()); 82 private final Logger log = getLogger(getClass());
85 83
...@@ -105,24 +103,41 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -105,24 +103,41 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
105 103
106 private ScheduledExecutorService executor; 104 private ScheduledExecutorService executor;
107 105
106 + private static final long INIT_DELAY = 5;
107 + private static final long DELAY = 5;
108 +
109 + private static final String PROP_ENABLED = "enabled";
110 + @Property(name = PROP_ENABLED, boolValue = true,
111 + label = "If false, link discovery is disabled")
112 + private boolean enabled = false;
113 +
114 + private static final String PROP_USE_BDDP = "useBDDP";
108 @Property(name = PROP_USE_BDDP, boolValue = true, 115 @Property(name = PROP_USE_BDDP, boolValue = true,
109 label = "Use BDDP for link discovery") 116 label = "Use BDDP for link discovery")
110 private boolean useBDDP = true; 117 private boolean useBDDP = true;
111 118
112 - @Property(name = PROP_DISABLE_LD, boolValue = false, 119 + private static final String PROP_PROBE_RATE = "probeRate";
113 - label = "Permanently disable link discovery") 120 + private static final int DEFAULT_PROBE_RATE = 3_000;
114 - private boolean disableLinkDiscovery = false; 121 + @Property(name = PROP_PROBE_RATE, intValue = DEFAULT_PROBE_RATE,
122 + label = "LLDP and BDDP probe rate specified in millis")
123 + private int probeRate = DEFAULT_PROBE_RATE;
115 124
116 - private static final long INIT_DELAY = 5; 125 + private static final String PROP_STALE_LINK_AGE = "staleLinkAge";
117 - private static final long DELAY = 5; 126 + private static final int DEFAULT_STALE_LINK_AGE = 10_000;
127 + @Property(name = PROP_STALE_LINK_AGE, intValue = DEFAULT_STALE_LINK_AGE,
128 + label = "Number of millis beyond which links will be considered stale")
129 + private int staleLinkAge = DEFAULT_STALE_LINK_AGE;
118 130
131 + // FIXME: convert to use network config subsystem instead
132 + private static final String PROP_LLDP_SUPPRESSION = "lldpSuppression";
133 + private static final String DEFAULT_LLDP_SUPPRESSION_CONFIG = "../config/lldp_suppression.json";
119 @Property(name = PROP_LLDP_SUPPRESSION, value = DEFAULT_LLDP_SUPPRESSION_CONFIG, 134 @Property(name = PROP_LLDP_SUPPRESSION, value = DEFAULT_LLDP_SUPPRESSION_CONFIG,
120 label = "Path to LLDP suppression configuration file") 135 label = "Path to LLDP suppression configuration file")
121 private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG; 136 private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG;
122 137
123 138
139 + private final DiscoveryContext context = new InternalDiscoveryContext();
124 private final InternalLinkProvider listener = new InternalLinkProvider(); 140 private final InternalLinkProvider listener = new InternalLinkProvider();
125 -
126 private final InternalRoleListener roleListener = new InternalRoleListener(); 141 private final InternalRoleListener roleListener = new InternalRoleListener();
127 142
128 protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>(); 143 protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
...@@ -141,57 +156,82 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -141,57 +156,82 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
141 public void activate(ComponentContext context) { 156 public void activate(ComponentContext context) {
142 cfgService.registerProperties(getClass()); 157 cfgService.registerProperties(getClass());
143 appId = coreService.registerApplication(PROVIDER_NAME); 158 appId = coreService.registerApplication(PROVIDER_NAME);
144 -
145 - // to load configuration at startup
146 modified(context); 159 modified(context);
147 - if (disableLinkDiscovery) { 160 + log.info("Started");
148 - log.info("LinkDiscovery has been permanently disabled by configuration"); 161 + }
149 - return; 162 +
163 + @Deactivate
164 + public void deactivate() {
165 + cfgService.unregisterProperties(getClass(), false);
166 + disable();
167 + log.info("Stopped");
168 + }
169 +
170 + @Modified
171 + public void modified(ComponentContext context) {
172 + Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
173 +
174 + boolean newEnabled, newUseBddp;
175 + int newProbeRate, newStaleLinkAge;
176 + String newLldpSuppression;
177 + try {
178 + String s = get(properties, PROP_ENABLED);
179 + newEnabled = isNullOrEmpty(s) || Boolean.parseBoolean(s.trim());
180 +
181 + s = get(properties, PROP_USE_BDDP);
182 + newUseBddp = isNullOrEmpty(s) || Boolean.parseBoolean(s.trim());
183 +
184 + s = get(properties, PROP_PROBE_RATE);
185 + newProbeRate = isNullOrEmpty(s) ? probeRate : Integer.parseInt(s.trim());
186 +
187 + s = get(properties, PROP_STALE_LINK_AGE);
188 + newStaleLinkAge = isNullOrEmpty(s) ? staleLinkAge : Integer.parseInt(s.trim());
189 +
190 + s = get(properties, PROP_LLDP_SUPPRESSION);
191 + newLldpSuppression = isNullOrEmpty(s) ? DEFAULT_LLDP_SUPPRESSION_CONFIG : s;
192 +
193 + } catch (NumberFormatException e) {
194 + log.warn(e.getMessage());
195 + newEnabled = enabled;
196 + newUseBddp = useBDDP;
197 + newProbeRate = probeRate;
198 + newStaleLinkAge = staleLinkAge;
199 + newLldpSuppression = lldpSuppression;
200 + }
201 +
202 + boolean wasEnabled = enabled;
203 +
204 + enabled = newEnabled;
205 + useBDDP = newUseBddp;
206 + probeRate = newProbeRate;
207 + staleLinkAge = newStaleLinkAge;
208 + lldpSuppression = newLldpSuppression;
209 +
210 + if (!wasEnabled && enabled) {
211 + enable();
212 + } else if (wasEnabled && !enabled) {
213 + disable();
150 } 214 }
151 215
216 + log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
217 + }
218 +
219 + private void enable() {
152 providerService = providerRegistry.register(this); 220 providerService = providerRegistry.register(this);
153 deviceService.addListener(listener); 221 deviceService.addListener(listener);
154 packetService.addProcessor(listener, PacketProcessor.advisor(0)); 222 packetService.addProcessor(listener, PacketProcessor.advisor(0));
155 masterService.addListener(roleListener); 223 masterService.addListener(roleListener);
156 224
157 - LinkDiscovery ld; 225 + processDevices();
158 - for (Device device : deviceService.getAvailableDevices()) {
159 - if (rules.isSuppressed(device)) {
160 - log.debug("LinkDiscovery from {} disabled by configuration", device.id());
161 - continue;
162 - }
163 - ld = new LinkDiscovery(device, packetService, masterService,
164 - providerService, useBDDP);
165 - discoverers.put(device.id(), ld);
166 - addPorts(ld, device.id());
167 - }
168 226
169 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d")); 227 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d"));
170 executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS); 228 executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS);
171 229
230 + loadSuppressionRules();
172 requestIntercepts(); 231 requestIntercepts();
173 -
174 - log.info("Started");
175 - }
176 -
177 - private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
178 - for (Port p : deviceService.getPorts(deviceId)) {
179 - if (rules.isSuppressed(p)) {
180 - continue;
181 - }
182 - if (!p.number().isLogical()) {
183 - discoverer.addPort(p);
184 - }
185 - }
186 - }
187 -
188 - @Deactivate
189 - public void deactivate() {
190 - cfgService.unregisterProperties(getClass(), false);
191 - if (disableLinkDiscovery) {
192 - return;
193 } 232 }
194 233
234 + private void disable() {
195 withdrawIntercepts(); 235 withdrawIntercepts();
196 236
197 providerRegistry.unregister(this); 237 providerRegistry.unregister(this);
...@@ -199,37 +239,36 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -199,37 +239,36 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
199 packetService.removeProcessor(listener); 239 packetService.removeProcessor(listener);
200 masterService.removeListener(roleListener); 240 masterService.removeListener(roleListener);
201 241
242 + if (executor != null) {
202 executor.shutdownNow(); 243 executor.shutdownNow();
244 + }
203 discoverers.values().forEach(LinkDiscovery::stop); 245 discoverers.values().forEach(LinkDiscovery::stop);
204 discoverers.clear(); 246 discoverers.clear();
205 - providerService = null;
206 247
207 - log.info("Stopped"); 248 + providerService = null;
208 } 249 }
209 250
210 - @Modified 251 + private void processDevices() {
211 - public void modified(ComponentContext context) { 252 + for (Device device : deviceService.getAvailableDevices()) {
212 - if (context == null) { 253 + if (rules.isSuppressed(device)) {
213 - loadSuppressionRules(); 254 + log.debug("LinkDiscovery from {} disabled by configuration", device.id());
214 - return; 255 + continue;
256 + }
257 + LinkDiscovery ld = new LinkDiscovery(device, context);
258 + discoverers.put(device.id(), ld);
259 + addPorts(ld, device.id());
260 + }
215 } 261 }
216 - @SuppressWarnings("rawtypes")
217 - Dictionary properties = context.getProperties();
218 262
219 - String s = get(properties, PROP_DISABLE_LD); 263 + private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
220 - if (!Strings.isNullOrEmpty(s)) { 264 + for (Port p : deviceService.getPorts(deviceId)) {
221 - disableLinkDiscovery = Boolean.valueOf(s); 265 + if (rules.isSuppressed(p)) {
266 + continue;
222 } 267 }
223 - s = get(properties, PROP_USE_BDDP); 268 + if (!p.number().isLogical()) {
224 - if (!Strings.isNullOrEmpty(s)) { 269 + discoverer.addPort(p);
225 - useBDDP = Boolean.valueOf(s);
226 } 270 }
227 - s = get(properties, PROP_LLDP_SUPPRESSION);
228 - if (!Strings.isNullOrEmpty(s)) {
229 - lldpSuppression = s;
230 } 271 }
231 - requestIntercepts();
232 - loadSuppressionRules();
233 } 272 }
234 273
235 private void loadSuppressionRules() { 274 private void loadSuppressionRules() {
...@@ -275,13 +314,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -275,13 +314,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
275 packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId); 314 packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
276 } 315 }
277 316
278 - private LinkDiscovery createLinkDiscovery(Device device) {
279 - return new LinkDiscovery(device, packetService, masterService,
280 - providerService, useBDDP);
281 - }
282 -
283 private class InternalRoleListener implements MastershipListener { 317 private class InternalRoleListener implements MastershipListener {
284 -
285 @Override 318 @Override
286 public void event(MastershipEvent event) { 319 public void event(MastershipEvent event) {
287 if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) { 320 if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
...@@ -298,22 +331,15 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -298,22 +331,15 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
298 if (rules.isSuppressed(device)) { 331 if (rules.isSuppressed(device)) {
299 return; 332 return;
300 } 333 }
301 - synchronized (discoverers) { 334 + discoverers.computeIfAbsent(deviceId, k -> new LinkDiscovery(device, context));
302 - if (!discoverers.containsKey(deviceId)) {
303 - // ideally, should never reach here
304 - log.debug("Device mastership changed ({}) {}", event.type(), deviceId);
305 - discoverers.put(deviceId, createLinkDiscovery(device));
306 - }
307 - }
308 } 335 }
309 336
310 } 337 }
311 338
312 private class InternalLinkProvider implements PacketProcessor, DeviceListener { 339 private class InternalLinkProvider implements PacketProcessor, DeviceListener {
313 -
314 @Override 340 @Override
315 public void event(DeviceEvent event) { 341 public void event(DeviceEvent event) {
316 - LinkDiscovery ld = null; 342 + LinkDiscovery ld;
317 Device device = event.subject(); 343 Device device = event.subject();
318 Port port = event.port(); 344 Port port = event.port();
319 if (device == null) { 345 if (device == null) {
...@@ -333,7 +359,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -333,7 +359,7 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
333 return; 359 return;
334 } 360 }
335 log.debug("Device added ({}) {}", event.type(), deviceId); 361 log.debug("Device added ({}) {}", event.type(), deviceId);
336 - discoverers.put(deviceId, createLinkDiscovery(device)); 362 + discoverers.put(deviceId, new LinkDiscovery(device, context));
337 } else { 363 } else {
338 if (ld.isStopped()) { 364 if (ld.isStopped()) {
339 log.debug("Device restarted ({}) {}", event.type(), deviceId); 365 log.debug("Device restarted ({}) {}", event.type(), deviceId);
...@@ -418,7 +444,6 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -418,7 +444,6 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
418 } 444 }
419 445
420 private final class SyncDeviceInfoTask implements Runnable { 446 private final class SyncDeviceInfoTask implements Runnable {
421 -
422 @Override 447 @Override
423 public void run() { 448 public void run() {
424 if (Thread.currentThread().isInterrupted()) { 449 if (Thread.currentThread().isInterrupted()) {
...@@ -433,13 +458,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -433,13 +458,9 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
433 } 458 }
434 DeviceId did = dev.id(); 459 DeviceId did = dev.id();
435 synchronized (discoverers) { 460 synchronized (discoverers) {
436 - LinkDiscovery discoverer = discoverers.get(did); 461 + LinkDiscovery ld = discoverers
437 - if (discoverer == null) { 462 + .computeIfAbsent(did, k -> new LinkDiscovery(dev, context));
438 - discoverer = createLinkDiscovery(dev); 463 + addPorts(ld, did);
439 - discoverers.put(did, discoverer);
440 - }
441 -
442 - addPorts(discoverer, did);
443 } 464 }
444 } 465 }
445 } catch (Exception e) { 466 } catch (Exception e) {
...@@ -449,4 +470,35 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -449,4 +470,35 @@ public class LLDPLinkProvider extends AbstractProvider implements LinkProvider {
449 } 470 }
450 } 471 }
451 472
473 + private class InternalDiscoveryContext implements DiscoveryContext {
474 + @Override
475 + public MastershipService mastershipService() {
476 + return masterService;
477 + }
478 +
479 + @Override
480 + public LinkProviderService providerService() {
481 + return providerService;
482 + }
483 +
484 + @Override
485 + public PacketService packetService() {
486 + return packetService;
487 + }
488 +
489 + @Override
490 + public long probeRate() {
491 + return probeRate;
492 + }
493 +
494 + @Override
495 + public long staleLinkAge() {
496 + return staleLinkAge;
497 + }
498 +
499 + @Override
500 + public boolean useBDDP() {
501 + return useBDDP;
502 + }
503 + }
452 } 504 }
......
...@@ -22,37 +22,30 @@ import org.jboss.netty.util.TimerTask; ...@@ -22,37 +22,30 @@ import org.jboss.netty.util.TimerTask;
22 import org.onlab.packet.Ethernet; 22 import org.onlab.packet.Ethernet;
23 import org.onlab.packet.ONOSLLDP; 23 import org.onlab.packet.ONOSLLDP;
24 import org.onlab.util.Timer; 24 import org.onlab.util.Timer;
25 -import org.onosproject.mastership.MastershipService;
26 import org.onosproject.net.ConnectPoint; 25 import org.onosproject.net.ConnectPoint;
27 import org.onosproject.net.Device; 26 import org.onosproject.net.Device;
28 import org.onosproject.net.DeviceId; 27 import org.onosproject.net.DeviceId;
29 import org.onosproject.net.Link.Type; 28 import org.onosproject.net.Link.Type;
29 +import org.onosproject.net.LinkKey;
30 import org.onosproject.net.Port; 30 import org.onosproject.net.Port;
31 import org.onosproject.net.PortNumber; 31 import org.onosproject.net.PortNumber;
32 import org.onosproject.net.link.DefaultLinkDescription; 32 import org.onosproject.net.link.DefaultLinkDescription;
33 import org.onosproject.net.link.LinkDescription; 33 import org.onosproject.net.link.LinkDescription;
34 -import org.onosproject.net.link.LinkProviderService;
35 import org.onosproject.net.packet.DefaultOutboundPacket; 34 import org.onosproject.net.packet.DefaultOutboundPacket;
36 import org.onosproject.net.packet.OutboundPacket; 35 import org.onosproject.net.packet.OutboundPacket;
37 import org.onosproject.net.packet.PacketContext; 36 import org.onosproject.net.packet.PacketContext;
38 -import org.onosproject.net.packet.PacketService;
39 import org.slf4j.Logger; 37 import org.slf4j.Logger;
40 38
41 import java.nio.ByteBuffer; 39 import java.nio.ByteBuffer;
42 -import java.util.Iterator;
43 import java.util.Map; 40 import java.util.Map;
44 import java.util.Set; 41 import java.util.Set;
45 -import java.util.concurrent.atomic.AtomicInteger; 42 +import java.util.stream.Collectors;
46 43
47 -import static com.google.common.base.Preconditions.checkNotNull;
48 import static java.util.concurrent.TimeUnit.MILLISECONDS; 44 import static java.util.concurrent.TimeUnit.MILLISECONDS;
49 import static org.onosproject.net.PortNumber.portNumber; 45 import static org.onosproject.net.PortNumber.portNumber;
50 import static org.onosproject.net.flow.DefaultTrafficTreatment.builder; 46 import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
51 import static org.slf4j.LoggerFactory.getLogger; 47 import static org.slf4j.LoggerFactory.getLogger;
52 48
53 -// TODO: add 'fast discovery' mode: drop LLDPs in destination switch but listen for flow_removed messages
54 -// FIXME: add ability to track links using port pairs or the link inventory
55 -
56 /** 49 /**
57 * Run discovery process from a physical switch. Ports are initially labeled as 50 * Run discovery process from a physical switch. Ports are initially labeled as
58 * slow ports. When an LLDP is successfully received, label the remote port as 51 * slow ports. When an LLDP is successfully received, label the remote port as
...@@ -64,34 +57,23 @@ public class LinkDiscovery implements TimerTask { ...@@ -64,34 +57,23 @@ public class LinkDiscovery implements TimerTask {
64 57
65 private final Logger log = getLogger(getClass()); 58 private final Logger log = getLogger(getClass());
66 59
67 - private static final short MAX_PROBE_COUNT = 3; // probes to send before link is removed
68 - private static final short DEFAULT_PROBE_RATE = 3000; // millis
69 private static final String SRC_MAC = "DE:AD:BE:EF:BA:11"; 60 private static final String SRC_MAC = "DE:AD:BE:EF:BA:11";
70 - private static final String SERVICE_NULL = "Service cannot be null";
71 61
72 private final Device device; 62 private final Device device;
73 - 63 + private final DiscoveryContext context;
74 - // send 1 probe every probeRate milliseconds
75 - private final long probeRate = DEFAULT_PROBE_RATE;
76 -
77 - private final Set<Long> slowPorts = Sets.newConcurrentHashSet();
78 - // ports, known to have incoming links
79 - private final Set<Long> fastPorts = Sets.newConcurrentHashSet();
80 -
81 - // number of unacknowledged probes per port
82 - private final Map<Long, AtomicInteger> portProbeCount = Maps.newHashMap();
83 64
84 private final ONOSLLDP lldpPacket; 65 private final ONOSLLDP lldpPacket;
85 private final Ethernet ethPacket; 66 private final Ethernet ethPacket;
86 private Ethernet bddpEth; 67 private Ethernet bddpEth;
87 - private final boolean useBDDP;
88 68
89 private Timeout timeout; 69 private Timeout timeout;
90 private volatile boolean isStopped; 70 private volatile boolean isStopped;
91 71
92 - private final LinkProviderService linkProvider; 72 + // Set of ports to be probed
93 - private final PacketService pktService; 73 + private final Set<Long> ports = Sets.newConcurrentHashSet();
94 - private final MastershipService mastershipService; 74 +
75 + // Most recent time a link was seen
76 + private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
95 77
96 /** 78 /**
97 * Instantiates discovery manager for the given physical switch. Creates a 79 * Instantiates discovery manager for the given physical switch. Creates a
...@@ -99,18 +81,11 @@ public class LinkDiscovery implements TimerTask { ...@@ -99,18 +81,11 @@ public class LinkDiscovery implements TimerTask {
99 * Starts the the timer for the discovery process. 81 * Starts the the timer for the discovery process.
100 * 82 *
101 * @param device the physical switch 83 * @param device the physical switch
102 - * @param pktService packet service 84 + * @param context discovery context
103 - * @param masterService mastership service
104 - * @param providerService link provider service
105 - * @param useBDDP flag to also use BDDP for discovery
106 */ 85 */
107 - public LinkDiscovery(Device device, PacketService pktService, 86 + public LinkDiscovery(Device device, DiscoveryContext context) {
108 - MastershipService masterService,
109 - LinkProviderService providerService, Boolean... useBDDP) {
110 this.device = device; 87 this.device = device;
111 - this.linkProvider = checkNotNull(providerService, SERVICE_NULL); 88 + this.context = context;
112 - this.pktService = checkNotNull(pktService, SERVICE_NULL);
113 - this.mastershipService = checkNotNull(masterService, SERVICE_NULL);
114 89
115 lldpPacket = new ONOSLLDP(); 90 lldpPacket = new ONOSLLDP();
116 lldpPacket.setChassisId(device.chassisId()); 91 lldpPacket.setChassisId(device.chassisId());
...@@ -122,15 +97,12 @@ public class LinkDiscovery implements TimerTask { ...@@ -122,15 +97,12 @@ public class LinkDiscovery implements TimerTask {
122 ethPacket.setPayload(this.lldpPacket); 97 ethPacket.setPayload(this.lldpPacket);
123 ethPacket.setPad(true); 98 ethPacket.setPad(true);
124 99
125 - this.useBDDP = useBDDP.length > 0 ? useBDDP[0] : false;
126 - if (this.useBDDP) {
127 bddpEth = new Ethernet(); 100 bddpEth = new Ethernet();
128 bddpEth.setPayload(lldpPacket); 101 bddpEth.setPayload(lldpPacket);
129 bddpEth.setEtherType(Ethernet.TYPE_BSN); 102 bddpEth.setEtherType(Ethernet.TYPE_BSN);
130 bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST); 103 bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
131 bddpEth.setPad(true); 104 bddpEth.setPad(true);
132 log.info("Using BDDP to discover network"); 105 log.info("Using BDDP to discover network");
133 - }
134 106
135 isStopped = true; 107 isStopped = true;
136 start(); 108 start();
...@@ -145,15 +117,8 @@ public class LinkDiscovery implements TimerTask { ...@@ -145,15 +117,8 @@ public class LinkDiscovery implements TimerTask {
145 * @param port the port 117 * @param port the port
146 */ 118 */
147 public void addPort(Port port) { 119 public void addPort(Port port) {
148 - boolean newPort = false; 120 + boolean newPort = ports.add(port.number().toLong());
149 - synchronized (this) { 121 + boolean isMaster = context.mastershipService().isLocalMaster(device.id());
150 - if (!containsPort(port.number().toLong())) {
151 - newPort = true;
152 - slowPorts.add(port.number().toLong());
153 - }
154 - }
155 -
156 - boolean isMaster = mastershipService.isLocalMaster(device.id());
157 if (newPort && isMaster) { 122 if (newPort && isMaster) {
158 log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id()); 123 log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
159 sendProbes(port.number().toLong()); 124 sendProbes(port.number().toLong());
...@@ -161,59 +126,31 @@ public class LinkDiscovery implements TimerTask { ...@@ -161,59 +126,31 @@ public class LinkDiscovery implements TimerTask {
161 } 126 }
162 127
163 /** 128 /**
164 - * Removes physical port from discovery process.
165 - *
166 - * @param port the port
167 - */
168 - public void removePort(Port port) {
169 - // Ignore ports that are not on this switch
170 - long portnum = port.number().toLong();
171 - synchronized (this) {
172 - if (slowPorts.contains(portnum)) {
173 - slowPorts.remove(portnum);
174 -
175 - } else if (fastPorts.contains(portnum)) {
176 - fastPorts.remove(portnum);
177 - portProbeCount.remove(portnum);
178 - // no iterator to update
179 - } else {
180 - log.warn("Tried to dynamically remove non-existing port {}", portnum);
181 - }
182 - }
183 - }
184 -
185 - /**
186 * Method called by remote port to acknowledge receipt of LLDP sent by 129 * Method called by remote port to acknowledge receipt of LLDP sent by
187 * this port. If slow port, updates label to fast. If fast port, decrements 130 * this port. If slow port, updates label to fast. If fast port, decrements
188 * number of unacknowledged probes. 131 * number of unacknowledged probes.
189 * 132 *
190 - * @param portNumber the port 133 + * @param key link key
191 */ 134 */
192 - public void ackProbe(Long portNumber) { 135 + private void ackProbe(LinkKey key) {
193 - synchronized (this) { 136 + long portNumber = key.src().port().toLong();
194 - if (slowPorts.contains(portNumber)) { 137 + if (ports.contains(portNumber)) {
195 - log.debug("Setting slow port to fast: {}:{}", device.id(), portNumber); 138 + linkTimes.put(key, System.currentTimeMillis());
196 - slowPorts.remove(portNumber);
197 - fastPorts.add(portNumber);
198 - portProbeCount.put(portNumber, new AtomicInteger(0));
199 - } else if (fastPorts.contains(portNumber)) {
200 - portProbeCount.get(portNumber).set(0);
201 } else { 139 } else {
202 log.debug("Got ackProbe for non-existing port: {}", portNumber); 140 log.debug("Got ackProbe for non-existing port: {}", portNumber);
203 } 141 }
204 } 142 }
205 - }
206 143
207 144
208 /** 145 /**
209 * Handles an incoming LLDP packet. Creates link in topology and sends ACK 146 * Handles an incoming LLDP packet. Creates link in topology and sends ACK
210 * to port where LLDP originated. 147 * to port where LLDP originated.
211 * 148 *
212 - * @param context packet context 149 + * @param packetContext packet context
213 * @return true if handled 150 * @return true if handled
214 */ 151 */
215 - public boolean handleLLDP(PacketContext context) { 152 + public boolean handleLLDP(PacketContext packetContext) {
216 - Ethernet eth = context.inPacket().parsed(); 153 + Ethernet eth = packetContext.inPacket().parsed();
217 if (eth == null) { 154 if (eth == null) {
218 return false; 155 return false;
219 } 156 }
...@@ -221,20 +158,21 @@ public class LinkDiscovery implements TimerTask { ...@@ -221,20 +158,21 @@ public class LinkDiscovery implements TimerTask {
221 ONOSLLDP onoslldp = ONOSLLDP.parseONOSLLDP(eth); 158 ONOSLLDP onoslldp = ONOSLLDP.parseONOSLLDP(eth);
222 if (onoslldp != null) { 159 if (onoslldp != null) {
223 PortNumber srcPort = portNumber(onoslldp.getPort()); 160 PortNumber srcPort = portNumber(onoslldp.getPort());
224 - PortNumber dstPort = context.inPacket().receivedFrom().port(); 161 + PortNumber dstPort = packetContext.inPacket().receivedFrom().port();
225 DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString()); 162 DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
226 - DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId(); 163 + DeviceId dstDeviceId = packetContext.inPacket().receivedFrom().deviceId();
227 - ackProbe(dstPort.toLong());
228 164
229 ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort); 165 ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
230 ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort); 166 ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
231 167
168 + ackProbe(LinkKey.linkKey(src, dst));
169 +
232 LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ? 170 LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
233 new DefaultLinkDescription(src, dst, Type.DIRECT) : 171 new DefaultLinkDescription(src, dst, Type.DIRECT) :
234 new DefaultLinkDescription(src, dst, Type.INDIRECT); 172 new DefaultLinkDescription(src, dst, Type.INDIRECT);
235 173
236 try { 174 try {
237 - linkProvider.linkDetected(ld); 175 + context.providerService().linkDetected(ld);
238 } catch (IllegalStateException e) { 176 } catch (IllegalStateException e) {
239 return true; 177 return true;
240 } 178 }
...@@ -256,48 +194,37 @@ public class LinkDiscovery implements TimerTask { ...@@ -256,48 +194,37 @@ public class LinkDiscovery implements TimerTask {
256 if (isStopped()) { 194 if (isStopped()) {
257 return; 195 return;
258 } 196 }
259 - if (!mastershipService.isLocalMaster(device.id())) { 197 +
198 + if (!context.mastershipService().isLocalMaster(device.id())) {
260 if (!isStopped()) { 199 if (!isStopped()) {
261 - // reschedule timer 200 + timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
262 - timeout = Timer.getTimer().newTimeout(this, probeRate, MILLISECONDS);
263 } 201 }
264 return; 202 return;
265 } 203 }
266 204
267 - log.trace("Sending probes from {}", device.id()); 205 + // Prune stale links
268 - synchronized (this) { 206 + linkTimes.entrySet().stream()
269 - Iterator<Long> fastIterator = fastPorts.iterator(); 207 + .filter(e -> isStale(e.getKey(), e.getValue()))
270 - while (fastIterator.hasNext()) { 208 + .map(Map.Entry::getKey).collect(Collectors.toSet())
271 - long portNumber = fastIterator.next(); 209 + .forEach(this::pruneLink);
272 - int probeCount = portProbeCount.get(portNumber).getAndIncrement();
273 210
274 - if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) { 211 + // Probe ports
275 - log.trace("Sending fast probe to port {}", portNumber); 212 + log.trace("Sending probes from {}", device.id());
276 - sendProbes(portNumber); 213 + ports.forEach(this::sendProbes);
277 214
278 - } else { 215 + if (!isStopped()) {
279 - // Link down, demote to slowPorts; update fast and slow ports 216 + timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
280 - fastIterator.remove();
281 - slowPorts.add(portNumber);
282 - portProbeCount.remove(portNumber);
283 -
284 - ConnectPoint cp = new ConnectPoint(device.id(), portNumber(portNumber));
285 - log.debug("Link down -> {}", cp);
286 - linkProvider.linksVanished(cp);
287 } 217 }
288 } 218 }
289 219
290 - // send a probe for the next slow port 220 + private void pruneLink(LinkKey key) {
291 - for (long portNumber : slowPorts) { 221 + linkTimes.remove(key);
292 - log.trace("Sending slow probe to port {}", portNumber); 222 + LinkDescription desc = new DefaultLinkDescription(key.src(), key.dst(), Type.DIRECT);
293 - sendProbes(portNumber); 223 + context.providerService().linkVanished(desc);
294 - }
295 } 224 }
296 225
297 - if (!isStopped()) { 226 + private boolean isStale(LinkKey key, long lastSeen) {
298 - // reschedule timer 227 + return lastSeen < (System.currentTimeMillis() - context.staleLinkAge());
299 - timeout = Timer.getTimer().newTimeout(this, probeRate, MILLISECONDS);
300 - }
301 } 228 }
302 229
303 public synchronized void stop() { 230 public synchronized void stop() {
...@@ -351,19 +278,18 @@ public class LinkDiscovery implements TimerTask { ...@@ -351,19 +278,18 @@ public class LinkDiscovery implements TimerTask {
351 private void sendProbes(Long portNumber) { 278 private void sendProbes(Long portNumber) {
352 log.trace("Sending probes out to {}@{}", portNumber, device.id()); 279 log.trace("Sending probes out to {}@{}", portNumber, device.id());
353 OutboundPacket pkt = createOutBoundLLDP(portNumber); 280 OutboundPacket pkt = createOutBoundLLDP(portNumber);
354 - pktService.emit(pkt); 281 + context.packetService().emit(pkt);
355 - if (useBDDP) { 282 + if (context.useBDDP()) {
356 OutboundPacket bpkt = createOutBoundBDDP(portNumber); 283 OutboundPacket bpkt = createOutBoundBDDP(portNumber);
357 - pktService.emit(bpkt); 284 + context.packetService().emit(bpkt);
358 } 285 }
359 } 286 }
360 287
361 - public boolean containsPort(Long portNumber) {
362 - return slowPorts.contains(portNumber) || fastPorts.contains(portNumber);
363 - }
364 -
365 public synchronized boolean isStopped() { 288 public synchronized boolean isStopped() {
366 return isStopped || timeout.isCancelled(); 289 return isStopped || timeout.isCancelled();
367 } 290 }
368 291
292 + boolean containsPort(long portNumber) {
293 + return ports.contains(portNumber);
294 + }
369 } 295 }
......
...@@ -108,7 +108,6 @@ public class LLDPLinkProviderTest { ...@@ -108,7 +108,6 @@ public class LLDPLinkProviderTest {
108 provider.providerRegistry = linkService; 108 provider.providerRegistry = linkService;
109 provider.masterService = masterService; 109 provider.masterService = masterService;
110 110
111 -
112 provider.activate(null); 111 provider.activate(null);
113 } 112 }
114 113
......