Madan Jampani

Anti-Entropy support for link store.

......@@ -4,9 +4,11 @@ import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -15,6 +17,8 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
......@@ -47,18 +51,24 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.onos.cluster.ControllerNodeToNodeId.toNodeId;
import static org.onlab.onos.net.DefaultAnnotations.union;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.base.Predicates.notNull;
......@@ -110,13 +120,30 @@ public class GossipLinkStore
}
};
private ScheduledExecutorService executor;
@Activate
public void activate() {
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_UPDATE, new InternalLinkEventListener());
GossipLinkStoreMessageSubjects.LINK_UPDATE,
new InternalLinkEventListener());
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_REMOVED,
new InternalLinkRemovedEventListener());
clusterCommunicator.addSubscriber(
GossipLinkStoreMessageSubjects.LINK_REMOVED, new InternalLinkRemovedEventListener());
GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
new InternalLinkAntiEntropyAdvertisementListener());
executor =
newSingleThreadScheduledExecutor(namedThreads("link-anti-entropy-%d"));
// TODO: Make these configurable
long initialDelaySec = 5;
long periodSec = 5;
// start anti-entropy thread
executor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec, TimeUnit.SECONDS);
log.info("Started");
}
......@@ -408,6 +435,10 @@ public class GossipLinkStore
NewConcurrentHashMap.<ProviderId, Timestamped<LinkDescription>>ifNeeded());
}
private Timestamped<LinkDescription> getLinkDescription(LinkKey key, ProviderId providerId) {
return getLinkDescriptions(key).get(providerId);
}
private final Function<LinkKey, Link> lookupLink = new LookupLink();
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
......@@ -448,6 +479,19 @@ public class GossipLinkStore
clusterCommunicator.broadcast(message);
}
// TODO: should we be throwing exception?
private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) {
try {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
SERIALIZER.encode(event));
clusterCommunicator.unicast(message, recipient);
} catch (IOException e) {
log.error("Failed to send a {} message to {}", subject.value(), recipient);
}
}
private void notifyPeers(InternalLinkEvent event) throws IOException {
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
......@@ -456,6 +500,125 @@ public class GossipLinkStore
broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
private void notifyPeer(NodeId peer, InternalLinkEvent event) {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
}
private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
.transform(toNodeId())
.toList();
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.info("No other peers in the cluster.");
return;
}
NodeId peer;
do {
int idx = RandomUtils.nextInt(0, nodeIds.size());
peer = nodeIds.get(idx);
} while (peer.equals(self));
LinkAntiEntropyAdvertisement ad = createAdvertisement();
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
} catch (Exception e) {
log.error("Failed to send anti-entropy advertisement", e);
return;
}
} catch (Exception e) {
// catch all Exception to avoid Scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
}
private LinkAntiEntropyAdvertisement createAdvertisement() {
final NodeId self = clusterService.getLocalNode().id();
Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
for (Entry<LinkKey, ConcurrentMap<ProviderId, Timestamped<LinkDescription>>>
provs : linkDescs.entrySet()) {
final LinkKey linkKey = provs.getKey();
final ConcurrentMap<ProviderId, Timestamped<LinkDescription>> linkDesc = provs.getValue();
synchronized (linkDesc) {
for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
}
}
}
linkTombstones.putAll(removedLinks);
return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
}
private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement advertisement) {
NodeId peer = advertisement.sender();
Map<LinkFragmentId, Timestamp> linkTimestamps = advertisement.linkTimestamps();
Map<LinkKey, Timestamp> linkTombstones = advertisement.linkTombstones();
for (Map.Entry<LinkFragmentId, Timestamp> entry : linkTimestamps.entrySet()) {
LinkFragmentId linkFragmentId = entry.getKey();
Timestamp peerTimestamp = entry.getValue();
LinkKey key = linkFragmentId.linkKey();
ProviderId providerId = linkFragmentId.providerId();
Timestamped<LinkDescription> linkDescription = getLinkDescription(key, providerId);
if (linkDescription.isNewer(peerTimestamp)) {
// I have more recent link description. update peer.
notifyPeer(peer, new InternalLinkEvent(providerId, linkDescription));
}
// else TODO: Peer has more recent link description. request it.
Timestamp linkRemovedTimestamp = removedLinks.get(key);
if (linkRemovedTimestamp != null && linkRemovedTimestamp.compareTo(peerTimestamp) > 0) {
// peer has a zombie link. update peer.
notifyPeer(peer, new InternalLinkRemovedEvent(key, linkRemovedTimestamp));
}
}
for (Map.Entry<LinkKey, Timestamp> entry : linkTombstones.entrySet()) {
LinkKey key = entry.getKey();
Timestamp peerTimestamp = entry.getValue();
ProviderId primaryProviderId = pickPrimaryProviderId(getLinkDescriptions(key));
if (primaryProviderId != null) {
if (!getLinkDescription(key, primaryProviderId).isNewer(peerTimestamp)) {
notifyDelegateIfNotNull(removeLinkInternal(key, peerTimestamp));
}
}
}
}
private class InternalLinkEventListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
......@@ -483,4 +646,14 @@ public class GossipLinkStore
notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
}
}
private final class InternalLinkAntiEntropyAdvertisementListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.info("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
handleAntiEntropyAdvertisement(advertisement);
}
}
}
......
......@@ -9,6 +9,10 @@ public final class GossipLinkStoreMessageSubjects {
private GossipLinkStoreMessageSubjects() {}
public static final MessageSubject LINK_UPDATE = new MessageSubject("peer-link-update");
public static final MessageSubject LINK_REMOVED = new MessageSubject("peer-link-removed");
public static final MessageSubject LINK_UPDATE =
new MessageSubject("peer-link-update");
public static final MessageSubject LINK_REMOVED =
new MessageSubject("peer-link-removed");
public static final MessageSubject LINK_ANTI_ENTROPY_ADVERTISEMENT =
new MessageSubject("link-enti-entropy-advertisement");
}
......
package org.onlab.onos.store.link.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Map;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.store.Timestamp;
/**
* Link AE Advertisement message.
*/
public class LinkAntiEntropyAdvertisement {
private final NodeId sender;
private final Map<LinkFragmentId, Timestamp> linkTimestamps;
private final Map<LinkKey, Timestamp> linkTombstones;
public LinkAntiEntropyAdvertisement(NodeId sender,
Map<LinkFragmentId, Timestamp> linkTimestamps,
Map<LinkKey, Timestamp> linkTombstones) {
this.sender = checkNotNull(sender);
this.linkTimestamps = checkNotNull(linkTimestamps);
this.linkTombstones = checkNotNull(linkTombstones);
}
public NodeId sender() {
return sender;
}
public Map<LinkFragmentId, Timestamp> linkTimestamps() {
return linkTimestamps;
}
public Map<LinkKey, Timestamp> linkTombstones() {
return linkTombstones;
}
// For serializer
@SuppressWarnings("unused")
private LinkAntiEntropyAdvertisement() {
this.sender = null;
this.linkTimestamps = null;
this.linkTombstones = null;
}
}
package org.onlab.onos.store.link.impl;
import java.util.Objects;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.provider.ProviderId;
import com.google.common.base.MoreObjects;
/**
* Identifier for LinkDescription from a Provider.
*/
public final class LinkFragmentId {
public final ProviderId providerId;
public final LinkKey linkKey;
public LinkFragmentId(LinkKey linkKey, ProviderId providerId) {
this.providerId = providerId;
this.linkKey = linkKey;
}
public LinkKey linkKey() {
return linkKey;
}
public ProviderId providerId() {
return providerId;
}
@Override
public int hashCode() {
return Objects.hash(providerId, linkKey);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof LinkFragmentId)) {
return false;
}
LinkFragmentId that = (LinkFragmentId) obj;
return Objects.equals(this.linkKey, that.linkKey) &&
Objects.equals(this.providerId, that.providerId);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("providerId", providerId)
.add("linkKey", linkKey)
.toString();
}
// for serializer
@SuppressWarnings("unused")
private LinkFragmentId() {
this.providerId = null;
this.linkKey = null;
}
}