Ayaka Koshibe
Committed by Ray Milkey

NullLinkProvider restructured with a CachedThreadPool. This should in theory

allow more LinkDescriptions to be generated.

  - patch 3/4: Some cleanup/sanity checks added.

Reference: ONOS-1033

Change-Id: I0b35585d5fdec3f7e6c921172f39f67dd6723dec
...@@ -22,13 +22,16 @@ import static org.onlab.util.Tools.toHex; ...@@ -22,13 +22,16 @@ import static org.onlab.util.Tools.toHex;
22 import static org.onosproject.net.MastershipRole.MASTER; 22 import static org.onosproject.net.MastershipRole.MASTER;
23 23
24 import java.util.Dictionary; 24 import java.util.Dictionary;
25 +import java.util.Iterator;
25 import java.util.List; 26 import java.util.List;
27 +import java.util.Set;
26 import java.util.Objects; 28 import java.util.Objects;
27 import java.util.concurrent.ConcurrentMap; 29 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ExecutorService; 30 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors; 31 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit; 32 import java.util.concurrent.TimeUnit;
31 33
34 +import org.apache.commons.lang3.concurrent.ConcurrentUtils;
32 import org.apache.felix.scr.annotations.Activate; 35 import org.apache.felix.scr.annotations.Activate;
33 import org.apache.felix.scr.annotations.Component; 36 import org.apache.felix.scr.annotations.Component;
34 import org.apache.felix.scr.annotations.Deactivate; 37 import org.apache.felix.scr.annotations.Deactivate;
...@@ -62,6 +65,7 @@ import org.slf4j.Logger; ...@@ -62,6 +65,7 @@ import org.slf4j.Logger;
62 65
63 import com.google.common.collect.Lists; 66 import com.google.common.collect.Lists;
64 import com.google.common.collect.Maps; 67 import com.google.common.collect.Maps;
68 +import com.google.common.collect.Sets;
65 69
66 /** 70 /**
67 * Provider which advertises fake/nonexistent links to the core. To be used for 71 * Provider which advertises fake/nonexistent links to the core. To be used for
...@@ -95,8 +99,12 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -95,8 +99,12 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
95 private final InternalLinkProvider linkProvider = new InternalLinkProvider(); 99 private final InternalLinkProvider linkProvider = new InternalLinkProvider();
96 private final InternalLinkListener listener = new InternalLinkListener(); 100 private final InternalLinkListener listener = new InternalLinkListener();
97 101
102 + // True for device with Driver, false otherwise.
103 + private final ConcurrentMap<DeviceId, Boolean> driverMap = Maps
104 + .newConcurrentMap();
105 +
98 // Link descriptions 106 // Link descriptions
99 - private final ConcurrentMap<ConnectPoint, LinkDescription> descriptions = Maps 107 + private final ConcurrentMap<DeviceId, Set<LinkDescription>> linkDescrs = Maps
100 .newConcurrentMap(); 108 .newConcurrentMap();
101 109
102 // Local Device ID's that have been seen so far 110 // Local Device ID's that have been seen so far
...@@ -104,20 +112,24 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -104,20 +112,24 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
104 // tail ends of other islands 112 // tail ends of other islands
105 private final List<ConnectPoint> tails = Lists.newArrayList(); 113 private final List<ConnectPoint> tails = Lists.newArrayList();
106 114
107 - private ExecutorService linkDriver = Executors.newFixedThreadPool(1, 115 + private final int checkRateDuration = 10;
108 - namedThreads("onos-null-link-driver")); 116 +
117 + private ExecutorService linkDriver = Executors.newCachedThreadPool(
118 + namedThreads("onos-null-link-driver-%d"));
109 119
110 // For flicker = true, duration between events in msec. 120 // For flicker = true, duration between events in msec.
111 @Property(name = "eventRate", value = "0", 121 @Property(name = "eventRate", value = "0",
112 label = "Duration between Link Event") 122 label = "Duration between Link Event")
113 private int eventRate = DEFAULT_RATE; 123 private int eventRate = DEFAULT_RATE;
114 - private int checkRateDuration = 10;
115 124
116 // For flicker = true, duration between events in msec. 125 // For flicker = true, duration between events in msec.
117 @Property(name = "neighbors", value = "", 126 @Property(name = "neighbors", value = "",
118 label = "Node ID of instance for neighboring island ") 127 label = "Node ID of instance for neighboring island ")
119 private String neighbor = ""; 128 private String neighbor = "";
120 129
130 + // flag checked to create a LinkDriver, if rate is non-zero.
131 + private boolean flicker = false;
132 +
121 public NullLinkProvider() { 133 public NullLinkProvider() {
122 super(new ProviderId("null", "org.onosproject.provider.nil")); 134 super(new ProviderId("null", "org.onosproject.provider.nil"));
123 } 135 }
...@@ -126,20 +138,20 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -126,20 +138,20 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
126 public void activate(ComponentContext context) { 138 public void activate(ComponentContext context) {
127 providerService = providerRegistry.register(this); 139 providerService = providerRegistry.register(this);
128 linkService = (LinkService) providerRegistry; 140 linkService = (LinkService) providerRegistry;
141 + modified(context);
129 linkService.addListener(listener); 142 linkService.addListener(listener);
130 deviceService.addListener(linkProvider); 143 deviceService.addListener(linkProvider);
131 - modified(context); 144 +
132 log.info("started"); 145 log.info("started");
133 } 146 }
134 147
135 @Deactivate 148 @Deactivate
136 public void deactivate(ComponentContext context) { 149 public void deactivate(ComponentContext context) {
137 - if (eventRate != 0) { 150 + linkDriver.shutdown();
138 - try { 151 + try {
139 - linkDriver.awaitTermination(1000, TimeUnit.MILLISECONDS); 152 + linkDriver.awaitTermination(1000, TimeUnit.MILLISECONDS);
140 - } catch (InterruptedException e) { 153 + } catch (InterruptedException e) {
141 - log.error("LinkBuilder did not terminate"); 154 + log.error("LinkBuilder did not terminate");
142 - }
143 linkDriver.shutdownNow(); 155 linkDriver.shutdownNow();
144 } 156 }
145 deviceService.removeListener(linkProvider); 157 deviceService.removeListener(linkProvider);
...@@ -175,10 +187,23 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -175,10 +187,23 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
175 if (newNbor != neighbor) { 187 if (newNbor != neighbor) {
176 neighbor = newNbor; 188 neighbor = newNbor;
177 } 189 }
178 -
179 if (newRate != 0 & eventRate != newRate) { 190 if (newRate != 0 & eventRate != newRate) {
180 eventRate = newRate; 191 eventRate = newRate;
181 - linkDriver.submit(new LinkDriver()); 192 + flicker = true;
193 + // try to find and add drivers for current devices
194 + for (Device dev : deviceService.getDevices()) {
195 + DeviceId did = dev.id();
196 + synchronized (this) {
197 + if (driverMap.get(did) == null || !driverMap.get(did)) {
198 + driverMap.put(dev.id(), true);
199 + linkDriver.submit(new LinkDriver(dev));
200 + }
201 + }
202 + }
203 + } else if (newRate == 0) {
204 + driverMap.replaceAll((k, v) -> false);
205 + } else {
206 + log.warn("Invalid link flicker rate {}", newRate);
182 } 207 }
183 208
184 log.info("Using new settings: eventRate={}", eventRate); 209 log.info("Using new settings: eventRate={}", eventRate);
...@@ -213,6 +238,70 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -213,6 +238,70 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
213 return ""; 238 return "";
214 } 239 }
215 240
241 + private boolean addLdesc(DeviceId did, LinkDescription ldesc) {
242 + Set<LinkDescription> ldescs = ConcurrentUtils.putIfAbsent(
243 + linkDescrs, did, Sets.newConcurrentHashSet());
244 + return ldescs.add(ldesc);
245 + }
246 +
247 + private void addLink(Device current) {
248 + DeviceId did = current.id();
249 + if (!MASTER.equals(roleService.getLocalRole(did))) {
250 +
251 + String part = part(did.toString());
252 + String npart = nIdPart(did.toString());
253 + if (part.equals("ffff") && npart.equals(neighbor)) {
254 + // 'tail' of our neighboring island - link us <- tail
255 + tails.add(new ConnectPoint(did, SRCPORT));
256 + }
257 + tryLinkTail();
258 + return;
259 + }
260 + devices.add(did);
261 +
262 + if (devices.size() == 1) {
263 + return;
264 + }
265 +
266 + // Normal flow - attach new device to the last-seen device
267 + DeviceId prev = devices.get(devices.size() - 2);
268 + ConnectPoint src = new ConnectPoint(prev, SRCPORT);
269 + ConnectPoint dst = new ConnectPoint(did, DSTPORT);
270 +
271 + LinkDescription fdesc = new DefaultLinkDescription(src, dst,
272 + Link.Type.DIRECT);
273 + LinkDescription rdesc = new DefaultLinkDescription(dst, src,
274 + Link.Type.DIRECT);
275 + addLdesc(prev, fdesc);
276 + addLdesc(did, rdesc);
277 +
278 + providerService.linkDetected(fdesc);
279 + providerService.linkDetected(rdesc);
280 + }
281 +
282 + // try to link to a tail to first element
283 + private void tryLinkTail() {
284 + if (tails.isEmpty() || devices.isEmpty()) {
285 + return;
286 + }
287 + ConnectPoint first = new ConnectPoint(devices.get(0), DSTPORT);
288 + boolean added = false;
289 + for (ConnectPoint cp : tails) {
290 + if (!linkService.getLinks(cp).isEmpty()) {
291 + continue;
292 + }
293 + LinkDescription ld = new DefaultLinkDescription(cp, first,
294 + Link.Type.DIRECT);
295 + addLdesc(cp.deviceId(), ld);
296 + providerService.linkDetected(ld);
297 + added = true;
298 + break;
299 + }
300 + if (added) {
301 + tails.clear();
302 + }
303 + }
304 +
216 /** 305 /**
217 * Adds links as devices are found, and generates LinkEvents. 306 * Adds links as devices are found, and generates LinkEvents.
218 */ 307 */
...@@ -223,9 +312,16 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -223,9 +312,16 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
223 Device dev = event.subject(); 312 Device dev = event.subject();
224 switch (event.type()) { 313 switch (event.type()) {
225 case DEVICE_ADDED: 314 case DEVICE_ADDED:
315 + synchronized (this) {
316 + if (flicker && !driverMap.getOrDefault(dev.id(), false)) {
317 + driverMap.put(dev.id(), true);
318 + linkDriver.submit(new LinkDriver(dev));
319 + }
320 + }
226 addLink(dev); 321 addLink(dev);
227 break; 322 break;
228 case DEVICE_REMOVED: 323 case DEVICE_REMOVED:
324 + driverMap.put(dev.id(), false);
229 removeLink(dev); 325 removeLink(dev);
230 break; 326 break;
231 default: 327 default:
...@@ -233,70 +329,25 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -233,70 +329,25 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
233 } 329 }
234 } 330 }
235 331
236 - private void addLink(Device current) {
237 - DeviceId did = current.id();
238 - if (!MASTER.equals(roleService.getLocalRole(did))) {
239 -
240 - String part = part(did.toString());
241 - String npart = nIdPart(did.toString());
242 - if (part.equals("ffff") && npart.equals(neighbor)) {
243 - // 'tail' of our neighboring island - link us <- tail
244 - tails.add(new ConnectPoint(did, SRCPORT));
245 - }
246 - tryLinkTail();
247 - return;
248 - }
249 - devices.add(did);
250 -
251 - if (devices.size() == 1) {
252 - return;
253 - }
254 -
255 - // Normal flow - attach new device to the last-seen device
256 - DeviceId prev = devices.get(devices.size() - 2);
257 - ConnectPoint src = new ConnectPoint(prev, SRCPORT);
258 - ConnectPoint dst = new ConnectPoint(did, DSTPORT);
259 -
260 - LinkDescription fdesc = new DefaultLinkDescription(src, dst,
261 - Link.Type.DIRECT);
262 - LinkDescription rdesc = new DefaultLinkDescription(dst, src,
263 - Link.Type.DIRECT);
264 - descriptions.put(src, fdesc);
265 - descriptions.put(dst, rdesc);
266 -
267 - providerService.linkDetected(fdesc);
268 - providerService.linkDetected(rdesc);
269 - }
270 -
271 - // try to link to a tail to first element
272 - private void tryLinkTail() {
273 - if (tails.isEmpty() || devices.isEmpty()) {
274 - return;
275 - }
276 - ConnectPoint first = new ConnectPoint(devices.get(0), DSTPORT);
277 - boolean added = false;
278 - for (ConnectPoint cp : tails) {
279 - if (!linkService.getLinks(cp).isEmpty()) {
280 - continue;
281 - }
282 - LinkDescription ld = new DefaultLinkDescription(cp, first,
283 - Link.Type.DIRECT);
284 - descriptions.put(cp, ld);
285 - providerService.linkDetected(ld);
286 - added = true;
287 - break;
288 - }
289 - if (added) {
290 - tails.clear();
291 - }
292 - }
293 -
294 private void removeLink(Device device) { 332 private void removeLink(Device device) {
295 if (!MASTER.equals(roleService.getLocalRole(device.id()))) { 333 if (!MASTER.equals(roleService.getLocalRole(device.id()))) {
296 return; 334 return;
297 } 335 }
298 providerService.linksVanished(device.id()); 336 providerService.linksVanished(device.id());
299 devices.remove(device.id()); 337 devices.remove(device.id());
338 + synchronized (linkDescrs) {
339 + Set<LinkDescription> lds = linkDescrs.remove(device.id());
340 + for (LinkDescription ld : lds) {
341 + ConnectPoint src = ld.src();
342 + DeviceId dst = ld.dst().deviceId();
343 + Iterator<LinkDescription> it = linkDescrs.get(dst).iterator();
344 + while (it.hasNext()) {
345 + if (it.next().dst().equals(src)) {
346 + it.remove();
347 + }
348 + }
349 + }
350 + }
300 } 351 }
301 352
302 } 353 }
...@@ -317,7 +368,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -317,7 +368,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
317 LinkDescription ld = new DefaultLinkDescription(event 368 LinkDescription ld = new DefaultLinkDescription(event
318 .subject().dst(), event.subject().src(), 369 .subject().dst(), event.subject().src(),
319 Link.Type.DIRECT); 370 Link.Type.DIRECT);
320 - descriptions.put(event.subject().dst(), ld); 371 + addLdesc(event.subject().dst().deviceId(), ld);
321 providerService.linkDetected(ld); 372 providerService.linkDetected(ld);
322 } 373 }
323 return; 374 return;
...@@ -334,18 +385,26 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -334,18 +385,26 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
334 * Generates link events using fake links. 385 * Generates link events using fake links.
335 */ 386 */
336 private class LinkDriver implements Runnable { 387 private class LinkDriver implements Runnable {
388 + Device myDev;
389 + LinkDriver(Device dev) {
390 + myDev = dev;
391 + }
337 392
338 @Override 393 @Override
339 public void run() { 394 public void run() {
395 + log.info("Thread started for dev {}", myDev.id());
340 long startTime = System.currentTimeMillis(); 396 long startTime = System.currentTimeMillis();
341 long countEvent = 0; 397 long countEvent = 0;
342 float effLoad = 0; 398 float effLoad = 0;
343 399
344 - while (!linkDriver.isShutdown()) { 400 + while (!linkDriver.isShutdown() && driverMap.get(myDev.id())) {
401 + if (linkDescrs.get(myDev.id()) == null) {
402 + addLink(myDev);
403 + }
345 404
346 //Assuming eventRate is in microsecond unit 405 //Assuming eventRate is in microsecond unit
347 if (countEvent <= checkRateDuration * 1000000 / eventRate) { 406 if (countEvent <= checkRateDuration * 1000000 / eventRate) {
348 - for (LinkDescription desc : descriptions.values()) { 407 + for (LinkDescription desc : linkDescrs.get(myDev.id())) {
349 providerService.linkVanished(desc); 408 providerService.linkVanished(desc);
350 countEvent++; 409 countEvent++;
351 try { 410 try {
...@@ -363,8 +422,10 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider { ...@@ -363,8 +422,10 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
363 } 422 }
364 } else { 423 } else {
365 // log in WARN the effective load generation rate in events/sec, every 10 seconds 424 // log in WARN the effective load generation rate in events/sec, every 10 seconds
366 - effLoad = (float) (countEvent * 1000 / (System.currentTimeMillis() - startTime)); 425 + effLoad = (float) (countEvent * 1000.0 /
367 - log.warn("Effective Loading is {} events/second", String.valueOf(effLoad)); 426 + (System.currentTimeMillis() - startTime));
427 + log.warn("Effective Loading for thread is {} events/second",
428 + String.valueOf(effLoad));
368 countEvent = 0; 429 countEvent = 0;
369 startTime = System.currentTimeMillis(); 430 startTime = System.currentTimeMillis();
370 } 431 }
......