Yuta HIGUCHI

GossipLinkStore AE bugfix + cleanup

Change-Id: If4cbaa65f980f10713488e6bf1be5d01c3131780
package org.onlab.onos.store.link.impl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
......@@ -27,7 +26,6 @@ import org.onlab.onos.net.Link;
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.Provided;
import org.onlab.onos.net.device.DeviceClockService;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
......@@ -70,7 +68,9 @@ import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
import static org.onlab.onos.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
/**
* Manages inventory of infrastructure links in distributed data store
......@@ -239,9 +239,9 @@ public class GossipLinkStore
LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
final LinkEvent event;
final Timestamped<LinkDescription> mergedDesc;
synchronized (getLinkDescriptions(key)) {
synchronized (getOrCreateLinkDescriptions(key)) {
event = createOrUpdateLinkInternal(providerId, deltaDesc);
mergedDesc = getLinkDescriptions(key).get(providerId);
mergedDesc = getOrCreateLinkDescriptions(key).get(providerId);
}
if (event != null) {
......@@ -265,7 +265,7 @@ public class GossipLinkStore
LinkKey key = linkKey(linkDescription.value().src(),
linkDescription.value().dst());
Map<ProviderId, Timestamped<LinkDescription>> descs = getLinkDescriptions(key);
Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
synchronized (descs) {
// if the link was previously removed, we should proceed if and
......@@ -296,7 +296,7 @@ public class GossipLinkStore
ProviderId providerId,
Timestamped<LinkDescription> linkDescription) {
// merge existing attributes and merge
// merge existing annotations
Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
return null;
......@@ -377,14 +377,54 @@ public class GossipLinkStore
return event;
}
private static Timestamped<LinkDescription> getPrimaryDescription(
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
synchronized (linkDescriptions) {
for (Entry<ProviderId, Timestamped<LinkDescription>>
e : linkDescriptions.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getValue();
}
}
}
return null;
}
// TODO: consider slicing out as Timestamp utils
/**
* Checks is timestamp is more recent than timestamped object.
*
* @param timestamp to check if this is more recent then other
* @param timestamped object to be tested against
* @return true if {@code timestamp} is more recent than {@code timestamped}
* or {@code timestamped is null}
*/
private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
checkNotNull(timestamp);
if (timestamped == null) {
return true;
}
return timestamp.compareTo(timestamped.timestamp()) > 0;
}
private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions =
getLinkDescriptions(key);
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
= getOrCreateLinkDescriptions(key);
synchronized (linkDescriptions) {
if (linkDescriptions.isEmpty()) {
// never seen such link before. keeping timestamp for record
removedLinks.put(key, timestamp);
return null;
}
// accept removal request if given timestamp is newer than
// the latest Timestamp from Primary provider
ProviderId primaryProviderId = pickPrimaryProviderId(linkDescriptions);
if (linkDescriptions.get(primaryProviderId).isNewer(timestamp)) {
Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
if (!isMoreRecent(timestamp, prim)) {
// outdated remove request, ignore
return null;
}
removedLinks.put(key, timestamp);
......@@ -406,12 +446,13 @@ public class GossipLinkStore
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryProviderId(
private static ProviderId pickBaseProviderId(
Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
if (!e.getKey().isAncillary()) {
// found primary
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
......@@ -421,9 +462,10 @@ public class GossipLinkStore
return fallBackPrimary;
}
// Guarded by linkDescs value (=locking each Link)
private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
ProviderId primaryProviderId = pickPrimaryProviderId(descs);
Timestamped<LinkDescription> base = descs.get(primaryProviderId);
ProviderId baseProviderId = pickBaseProviderId(descs);
Timestamped<LinkDescription> base = descs.get(baseProviderId);
ConnectPoint src = base.value().src();
ConnectPoint dst = base.value().dst();
......@@ -432,7 +474,7 @@ public class GossipLinkStore
annotations = merge(annotations, base.value().annotations());
for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
if (primaryProviderId.equals(e.getKey())) {
if (baseProviderId.equals(e.getKey())) {
continue;
}
......@@ -445,10 +487,10 @@ public class GossipLinkStore
annotations = merge(annotations, e.getValue().value().annotations());
}
return new DefaultLink(primaryProviderId , src, dst, type, annotations);
return new DefaultLink(baseProviderId, src, dst, type, annotations);
}
private Map<ProviderId, Timestamped<LinkDescription>> getLinkDescriptions(LinkKey key) {
private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
Map<ProviderId, Timestamped<LinkDescription>> r;
r = linkDescs.get(key);
if (r != null) {
......@@ -464,11 +506,11 @@ public class GossipLinkStore
}
}
private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
return getLinkDescriptions(key).get(providerId);
}
private final Function<LinkKey, Link> lookupLink = new LookupLink();
/**
* Returns a Function to lookup Link instance using LinkKey from cache.
* @return
*/
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
}
......@@ -480,26 +522,12 @@ public class GossipLinkStore
}
}
private static final class IsPrimary implements Predicate<Provided> {
private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
public static final Predicate<Provided> isPrimary() {
return IS_PRIMARY;
}
@Override
public boolean apply(Provided input) {
return !input.providerId().isAncillary();
}
}
private void notifyDelegateIfNotNull(LinkEvent event) {
if (event != null) {
notifyDelegate(event);
}
}
// TODO: should we be throwing exception?
private void broadcastMessage(MessageSubject subject, Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
......@@ -508,17 +536,12 @@ public class GossipLinkStore
clusterCommunicator.broadcast(message);
}
// TODO: should we be throwing exception?
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
try {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, recipient);
} catch (IOException e) {
log.error("Failed to send a {} message to {}", subject.value(), recipient);
}
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, recipient);
}
private void notifyPeers(InternalLinkEvent event) throws IOException {
......@@ -529,12 +552,22 @@ public class GossipLinkStore
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
// notify peer, silently ignoring error
private void notifyPeer(NodeId peer, InternalLinkEvent event) {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
} catch (IOException e) {
log.debug("Failed to notify peer {} with message {}", peer, event);
}
}
// notify peer, silently ignoring error
private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
} catch (IOException e) {
log.debug("Failed to notify peer {} with message {}", peer, event);
}
}
private final class SendAdvertisementTask implements Runnable {
......@@ -573,9 +606,9 @@ public class GossipLinkStore
}
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (Exception e) {
log.error("Failed to send anti-entropy advertisement", e);
unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (IOException e) {
log.debug("Failed to send anti-entropy advertisement to {}", peer);
return;
}
} catch (Exception e) {
......@@ -608,42 +641,75 @@ public class GossipLinkStore
return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
}
private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
NodeId peer = advertisement.sender();
private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
LinkFragmentId linkFragmentId = entry.getKey();
Timestamp peerTimestamp = entry.getValue();
final NodeId sender = ad.sender();
boolean localOutdated = false;
LinkKey key = linkFragmentId.linkKey();
ProviderId providerId = linkFragmentId.providerId();
Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
if (linkDescription.isNewer(peerTimestamp)) {
// I have more recent link description. update peer.
notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
}
// else TODO: Peer has more recent link description. request it.
Timestamp linkRemovedTimestamp = removedLinks.get(key);
if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
// peer has a zombie link. update peer.
notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
l : linkDescs.entrySet()) {
final LinkKey key = l.getKey();
final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
synchronized (link) {
Timestamp localLatest = removedLinks.get(key);
for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
final ProviderId providerId = p.getKey();
final Timestamped<LinkDescription> pDesc = p.getValue();
final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
// remote
Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
if (remoteTimestamp == null) {
remoteTimestamp = ad.linkTombstones().get(key);
}
if (remoteTimestamp == null ||
pDesc.isNewer(remoteTimestamp)) {
// I have more recent link description. update peer.
notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
} else {
final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
if (remoteLive != null &&
remoteLive.compareTo(pDesc.timestamp()) > 0) {
// I have something outdated
localOutdated = true;
}
}
// search local latest along the way
if (localLatest == null ||
pDesc.isNewer(localLatest)) {
localLatest = pDesc.timestamp();
}
}
// Tests if remote remove is more recent then local latest.
final Timestamp remoteRemove = ad.linkTombstones().get(key);
if (remoteRemove != null) {
if (localLatest != null &&
localLatest.compareTo(remoteRemove) < 0) {
// remote remove is more recent
notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
}
}
}
}
for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
LinkKey key = entry.getKey();
Timestamp peerTimestamp = entry.getValue();
// populate remove info if not known locally
for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
final LinkKey key = remoteRm.getKey();
final Timestamp remoteRemove = remoteRm.getValue();
// relying on removeLinkInternal to ignore stale info
notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
}
ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
if (primaryProviderId != null) {
if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
}
if (localOutdated) {
// send back advertisement to speed up convergence
try {
unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
createAdvertisement());
} catch (IOException e) {
log.debug("Failed to send back active advertisement");
}
}
}
......