kircaali
Committed by Gerrit Code Review

Fix for ONOS-3183

Stream processes by Lambda queries in collection objects has a known performance bottleneck. For each create and update link data operations, this  kind of queries are frequently used and this causes inefficient topology discovery operation. For to solve that problem i analize thread dump during discovery process and i saw that especially getAllProviders method uses high cpu. than i change the provider search operation with additional map data which holds linkkey and its providers. by this way provider search operation not only gets faster but also uses minimum resource. Before that change, i can discover only 4XX number of switches at one controller, but later number grows to 15XX switches

Change-Id: I65ed71b7f295917c818b2f9227d0fc070aa4a29b
......@@ -71,6 +71,7 @@ import org.slf4j.Logger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -95,8 +96,8 @@ import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true, enabled = true)
@Service
public class ECLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
/**
* Modes for dealing with newly discovered links.
......@@ -117,8 +118,10 @@ public class ECLinkStore
private final Logger log = getLogger(getClass());
private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
private final Map<LinkKey, Set<ProviderId>> linkProviders = Maps.newConcurrentMap();
private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
private ApplicationId appId;
private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
......@@ -188,10 +191,10 @@ public class ECLinkStore
}).build();
clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
SERIALIZER::decode,
this::injectLink,
SERIALIZER::encode,
SharedExecutors.getPoolThreadExecutor());
SERIALIZER::decode,
this::injectLink,
SERIALIZER::encode,
SharedExecutors.getPoolThreadExecutor());
linkDescriptions.addListener(linkTracker);
......@@ -202,6 +205,7 @@ public class ECLinkStore
public void deactivate() {
linkDescriptions.removeListener(linkTracker);
linkDescriptions.destroy();
linkProviders.clear();
links.clear();
clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
netCfgService.removeListener(cfgListener);
......@@ -273,10 +277,10 @@ public class ECLinkStore
return null;
}
return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
LINK_INJECT_MESSAGE,
SERIALIZER::encode,
SERIALIZER::decode,
dstNodeId));
LINK_INJECT_MESSAGE,
SERIALIZER::encode,
SERIALIZER::decode,
dstNodeId));
}
}
......@@ -295,16 +299,24 @@ public class ECLinkStore
private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
if (current != null) {
// we only allow transition from INDIRECT -> DIRECT
return new DefaultLinkDescription(
current.src(),
current.dst(),
current.type() == DIRECT ? DIRECT : updated.type(),
current.isExpected(),
union(current.annotations(), updated.annotations()));
return new DefaultLinkDescription(
current.src(),
current.dst(),
current.type() == DIRECT ? DIRECT : updated.type(),
current.isExpected(),
union(current.annotations(), updated.annotations()));
}
return updated;
}
private Set<ProviderId> createOrUpdateLinkProviders(Set<ProviderId> current, ProviderId providerId) {
if (current == null) {
current = Sets.newConcurrentHashSet();
}
current.add(providerId);
return current;
}
private LinkEvent refreshLinkCache(LinkKey linkKey) {
AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
Link link = links.compute(linkKey, (key, existingLink) -> {
......@@ -313,11 +325,11 @@ public class ECLinkStore
eventType.set(LINK_ADDED);
return newLink;
} else if (existingLink.state() != newLink.state() ||
existingLink.isExpected() != newLink.isExpected() ||
(existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
eventType.set(LINK_UPDATED);
return newLink;
existingLink.isExpected() != newLink.isExpected() ||
(existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
eventType.set(LINK_UPDATED);
return newLink;
} else {
return existingLink;
}
......@@ -326,20 +338,16 @@ public class ECLinkStore
}
private Set<ProviderId> getAllProviders(LinkKey linkKey) {
return linkDescriptions.keySet()
.stream()
.filter(key -> key.key().equals(linkKey))
.map(key -> key.providerId())
.collect(Collectors.toSet());
return linkProviders.getOrDefault(linkKey, Sets.newConcurrentHashSet());
}
private ProviderId getBaseProviderId(LinkKey linkKey) {
Set<ProviderId> allProviders = getAllProviders(linkKey);
if (allProviders.size() > 0) {
return allProviders.stream()
.filter(p -> !p.isAncillary())
.findFirst()
.orElse(Iterables.getFirst(allProviders, null));
.filter(p -> !p.isAncillary())
.findFirst()
.orElse(Iterables.getFirst(allProviders, null));
}
return null;
}
......@@ -356,11 +364,14 @@ public class ECLinkStore
annotations.set(merge(annotations.get(), base.annotations()));
getAllProviders(linkKey).stream()
.map(p -> new Provided<>(linkKey, p))
.forEach(key -> {
annotations.set(merge(annotations.get(),
linkDescriptions.get(key).annotations()));
});
.map(p -> new Provided<>(linkKey, p))
.forEach(key -> {
LinkDescription linkDescription = linkDescriptions.get(key);
if (linkDescription != null) {
annotations.set(merge(annotations.get(),
linkDescription.annotations()));
}
});
Link.State initialLinkState;
......@@ -375,8 +386,6 @@ public class ECLinkStore
}
return DefaultLink.builder()
.providerId(baseProviderId)
.src(src)
......@@ -394,8 +403,8 @@ public class ECLinkStore
// Note: INDIRECT -> DIRECT transition only
// so that BDDP discovered Link will not overwrite LDDP Link
if (oldLink.state() != newLink.state() ||
(oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
(oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
return new LinkEvent(LINK_UPDATED, newLink);
......@@ -447,6 +456,7 @@ public class ECLinkStore
Link removedLink = links.remove(linkKey);
if (removedLink != null) {
getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
linkProviders.remove(linkKey);
return new LinkEvent(LINK_REMOVED, removedLink);
}
return null;
......@@ -475,9 +485,12 @@ public class ECLinkStore
@Override
public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
if (event.type() == PUT) {
linkProviders.compute(event.key().key(), (k, v) ->
createOrUpdateLinkProviders(v, event.key().providerId()));
notifyDelegate(refreshLinkCache(event.key().key()));
} else if (event.type() == REMOVE) {
notifyDelegate(purgeLinkCache(event.key().key()));
linkProviders.remove(event.key().key());
}
}
}
......@@ -521,8 +534,8 @@ public class ECLinkStore
// Configuration properties factory
private final ConfigFactory factory =
new ConfigFactory<ApplicationId, CoreConfig>(APP_SUBJECT_FACTORY,
CoreConfig.class,
"core") {
CoreConfig.class,
"core") {
@Override
public CoreConfig createConfig() {
return new CoreConfig();
......