Ayaka Koshibe
Committed by Pavlin Radoslavov

File read properly updates topology information

Change-Id: I1e78e06e701cef45e5454d6e928967187174f8e5
......@@ -15,6 +15,7 @@
*/
package org.onosproject.provider.nil.link.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -49,7 +50,6 @@ import org.slf4j.Logger;
import java.util.Dictionary;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
......@@ -87,6 +87,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
private static final int CHECK_DURATION = 10;
private static final int DEFAULT_RATE = 0; // usec
private static final int REFRESH_RATE = 3; // sec
private static final DeviceId DEFAULT = DeviceId.deviceId("null:ffffffffffffffff");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
......@@ -109,10 +110,10 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
.newConcurrentMap();
// Link descriptions
private final Set<LinkDescription> linkDescrs = Sets.newConcurrentHashSet();
private final List<LinkDescription> linkDescrs = Lists.newArrayList();
// Thread to description map
private final List<Set<LinkDescription>> linkTasks = new ArrayList<>(THREADS);
private final List<List<LinkDescription>> linkTasks = Lists.newArrayList();
private ScheduledExecutorService linkDriver =
Executors.newScheduledThreadPool(THREADS, groupedThreads("onos/null", "link-driver-%d"));
......@@ -140,10 +141,9 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
modified(context);
if (flicker) {
allocateLinks();
for (int i = 0; i < linkTasks.size(); i++) {
Set<LinkDescription> links = linkTasks.get(i);
LinkDriver driver = new LinkDriver(links, i);
List<LinkDescription> links = linkTasks.get(i);
LinkDriver driver = new LinkDriver(links);
links.forEach(v -> {
DeviceId d = v.src().deviceId();
Set<LinkDriver> s = driverMap.getOrDefault(d, Sets.newConcurrentHashSet());
......@@ -157,7 +157,9 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
}
}
} else {
linkDriver.schedule(new LinkDriver(linkDescrs, 0), 3, TimeUnit.SECONDS);
LinkDriver driver = new LinkDriver(linkDescrs);
driverMap.put(DEFAULT, Sets.newHashSet(driver));
linkDriver.schedule(driver, 3, TimeUnit.SECONDS);
}
log.info("started");
}
......@@ -208,8 +210,12 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
if (eventRate != newRate && newRate > 0) {
eventRate = newRate;
flicker = true;
allocateLinks();
} else if (newRate == 0) {
flicker = false;
// reconfigure driver - dumb but should work.
driverMap.getOrDefault(DEFAULT, Sets.newHashSet()).forEach(
v -> v.setTasks(linkDescrs));
}
log.info("Using settings: eventRate={}, topofile={}", eventRate, cfgFile);
......@@ -218,6 +224,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
// parse simplified dot-like topology graph
private void readGraph(String path, NodeId me) {
log.info("path: {}, local: {}", path, me);
Set<LinkDescription> read = Sets.newHashSet();
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(path));
......@@ -239,7 +246,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
if (cur.trim().contains("}")) {
break;
}
readLink(cur.trim().split(" "), me);
readLink(cur.trim().split(" "), me, read);
cur = br.readLine();
}
} else {
......@@ -264,10 +271,16 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
log.warn("Could not close topology file: {}", e);
}
}
synchronized (linkDescrs) {
if (!read.isEmpty()) {
linkDescrs.clear();
linkDescrs.addAll(read);
}
}
}
// parses a link descriptor to make a LinkDescription
private void readLink(String[] linkArr, NodeId me) {
private void readLink(String[] linkArr, NodeId me, Set<LinkDescription> links) {
if (linkArr[0].startsWith("#")) {
return;
}
......@@ -302,11 +315,11 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
ConnectPoint dst = new ConnectPoint(ddev, PortNumber.portNumber(cp2[1]));
// both link types have incoming half-link
LinkDescription in = new DefaultLinkDescription(dst, src, DIRECT);
linkDescrs.add(in);
links.add(in);
if (op.equals("--")) {
// bidirectional - within our node's island, make outbound link
LinkDescription out = new DefaultLinkDescription(src, dst, DIRECT);
linkDescrs.add(out);
links.add(out);
log.info("Created bidirectional link: {}, {}", out, in);
} else if (op.equals("->")) {
log.info("Created unidirectional link: {}", in);
......@@ -336,7 +349,7 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
index = (lcount % THREADS);
log.info("allocation: total={}, index={}", linkDescrs.size(), lcount, index);
if (linkTasks.size() <= index) {
linkTasks.add(index, Sets.newHashSet(ld));
linkTasks.add(index, Lists.newArrayList(ld));
} else {
linkTasks.get(index).add(ld);
}
......@@ -383,16 +396,13 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
long startTime;
LinkDriver(Set<LinkDescription> links, int index) {
for (LinkDescription link : links) {
tasks.add(link);
}
startTime = System.currentTimeMillis();
LinkDriver(List<LinkDescription> links) {
setTasks(links);
startTime = System.currentTimeMillis(); // yes, this will start off inaccurate
}
@Override
public void run() {
log.info("Thread started for links {}", tasks);
if (flicker) {
flicker();
} else {
......@@ -428,9 +438,9 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
private void refresh() {
if (!linkDriver.isShutdown() || !tasks.isEmpty()) {
log.info("iter {} refresh_links for {} links", counter, linkDescrs.size());
log.info("iter {} refresh_links", counter);
for (LinkDescription desc : linkDescrs) {
for (LinkDescription desc : tasks) {
providerService.linkDetected(desc);
log.info("iteration {}, {}", counter, desc);
}
......@@ -451,6 +461,21 @@ public class NullLinkProvider extends AbstractProvider implements LinkProvider {
}
}
}
public void setTasks(List<LinkDescription> links) {
HashMultimap<ConnectPoint, ConnectPoint> nm = HashMultimap.create();
links.forEach(v -> nm.put(v.src(), v.dst()));
// remove and send linkVanished for stale links.
synchronized (this) {
for (LinkDescription l : tasks) {
if (!nm.containsEntry(l.src(), l.dst())) {
providerService.linkVanished(l);
}
}
tasks.clear();
tasks.addAll(links);
}
}
}
}
......