Jonathan Hart

Implement anti-entropy for the EventuallyConsistentMap.

ONOS-857. 

Change-Id: Ife2070142d3c165c2a0035c3011c05b426c8baa4
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Anti-entropy advertisement message for eventually consistent map.
*/
public class AntiEntropyAdvertisement<K> {
private final NodeId sender;
private final Map<K, Timestamp> timestamps;
private final Map<K, Timestamp> tombstones;
/**
* Creates a new anti entropy advertisement message.
*
* @param sender the sender's node ID
* @param timestamps map of item key to timestamp for current items
* @param tombstones map of item key to timestamp for removed items
*/
public AntiEntropyAdvertisement(NodeId sender,
Map<K, Timestamp> timestamps,
Map<K, Timestamp> tombstones) {
this.sender = checkNotNull(sender);
this.timestamps = checkNotNull(timestamps);
this.tombstones = checkNotNull(tombstones);
}
/**
* Returns the sender's node ID.
*
* @return the sender's node ID
*/
public NodeId sender() {
return sender;
}
/**
* Returns the map of current item timestamps.
*
* @return current item timestamps
*/
public Map<K, Timestamp> timestamps() {
return timestamps;
}
/**
* Returns the map of removed item timestamps.
*
* @return removed item timestamps
*/
public Map<K, Timestamp> tombstones() {
return tombstones;
}
// For serializer
@SuppressWarnings("unused")
private AntiEntropyAdvertisement() {
this.sender = null;
this.timestamps = null;
this.tombstones = null;
}
}
......@@ -16,8 +16,10 @@
package org.onosproject.store.impl;
import com.google.common.base.MoreObjects;
import org.apache.commons.lang3.RandomUtils;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
......@@ -32,6 +34,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -40,6 +44,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -69,8 +74,9 @@ public class EventuallyConsistentMapImpl<K, V>
private final MessageSubject updateMessageSubject;
private final MessageSubject removeMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
private final Set<EventuallyConsistentMapListener> listeners
private final Set<EventuallyConsistentMapListener<K, V>> listeners
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
......@@ -138,12 +144,20 @@ public class EventuallyConsistentMapImpl<K, V>
newSingleThreadScheduledExecutor(minPriority(
namedThreads("onos-ecm-" + mapName + "-bg-%d")));
// start anti-entropy thread
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
initialDelaySec, periodSec,
TimeUnit.SECONDS);
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
new InternalPutEventListener());
removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
clusterCommunicator.addSubscriber(removeMessageSubject,
new InternalRemoveEventListener());
antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
new InternalAntiEntropyListener());
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
......@@ -158,9 +172,9 @@ public class EventuallyConsistentMapImpl<K, V>
.register(ArrayList.class)
.register(InternalPutEvent.class)
.register(InternalRemoveEvent.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
.build();
// TODO anti-entropy classes
}
};
}
......@@ -360,8 +374,8 @@ public class EventuallyConsistentMapImpl<K, V>
clusterCommunicator.removeSubscriber(removeMessageSubject);
}
private void notifyListeners(EventuallyConsistentMapEvent event) {
for (EventuallyConsistentMapListener listener : listeners) {
private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
for (EventuallyConsistentMapListener<K, V> listener : listeners) {
listener.event(event);
}
}
......@@ -418,6 +432,245 @@ public class EventuallyConsistentMapImpl<K, V>
}
}
private final class SendAdvertisementTask implements Runnable {
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
try {
final NodeId self = clusterService.getLocalNode().id();
Set<ControllerNode> nodes = clusterService.getNodes();
List<NodeId> nodeIds = nodes.stream()
.map(node -> node.id())
.collect(Collectors.toList());
if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
log.trace("No other peers in the cluster.");
return;
}
NodeId peer;
do {
int idx = RandomUtils.nextInt(0, nodeIds.size());
peer = nodeIds.get(idx);
} while (peer.equals(self));
if (Thread.currentThread().isInterrupted()) {
log.info("Interrupted, quitting");
return;
}
AntiEntropyAdvertisement<K> ad = createAdvertisement();
try {
unicastMessage(peer, antiEntropyAdvertisementSubject, ad);
} catch (IOException e) {
log.debug("Failed to send anti-entropy advertisement to {}", peer);
}
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
}
}
}
private AntiEntropyAdvertisement<K> createAdvertisement() {
final NodeId self = clusterService.getLocalNode().id();
Map<K, Timestamp> timestamps = new HashMap<>(items.size());
items.forEach((key, value) -> timestamps.put(key, value.timestamp()));
Map<K, Timestamp> tombstones = new HashMap<>(removedItems);
return new AntiEntropyAdvertisement<>(self, timestamps, tombstones);
}
private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
List<EventuallyConsistentMapEvent<K, V>> externalEvents;
synchronized (this) {
final NodeId sender = ad.sender();
externalEvents = antiEntropyCheckLocalItems(ad);
antiEntropyCheckLocalRemoved(ad);
externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
// if remote ad has something unknown, actively sync
for (K key : ad.timestamps().keySet()) {
if (!items.containsKey(key)) {
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
try {
unicastMessage(sender, antiEntropyAdvertisementSubject,
myAd);
break;
} catch (IOException e) {
log.debug(
"Failed to send reactive anti-entropy advertisement to {}",
sender);
}
}
}
} // synchronized (this)
externalEvents.forEach(this::notifyListeners);
}
/**
* Checks if any of the remote's live items or tombstones are out of date
* according to our local live item list, or if our live items are out of
* date according to the remote's tombstone list.
* If the local copy is more recent, it will be pushed to the remote. If the
* remote has a more recent remove, we apply that to the local state.
*
* @param ad remote anti-entropy advertisement
* @return list of external events relating to local operations performed
*/
// Guarded by synchronized (this)
private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents
= new LinkedList<>();
final NodeId sender = ad.sender();
final List<PutEntry<K, V>> updatesToSend = new ArrayList<>();
for (Map.Entry<K, Timestamped<V>> item : items.entrySet()) {
K key = item.getKey();
Timestamped<V> localValue = item.getValue();
Timestamp remoteTimestamp = ad.timestamps().get(key);
if (remoteTimestamp == null) {
remoteTimestamp = ad.tombstones().get(key);
}
if (remoteTimestamp == null || localValue
.isNewer(remoteTimestamp)) {
// local value is more recent, push to sender
updatesToSend
.add(new PutEntry<>(key, localValue.value(),
localValue.timestamp()));
}
Timestamp remoteDeadTimestamp = ad.tombstones().get(key);
if (remoteDeadTimestamp != null &&
remoteDeadTimestamp.compareTo(localValue.timestamp()) > 0) {
// sender has a more recent remove
if (removeInternal(key, remoteDeadTimestamp)) {
externalEvents.add(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
}
}
// Send all updates to the peer at once
if (!updatesToSend.isEmpty()) {
try {
unicastMessage(sender, updateMessageSubject, new InternalPutEvent<>(updatesToSend));
} catch (IOException e) {
log.warn("Failed to send advertisement response", e);
}
}
return externalEvents;
}
/**
* Checks if any items in the remote live list are out of date according
* to our tombstone list. If we find we have a more up to date tombstone,
* we'll send it to the remote.
*
* @param ad remote anti-entropy advertisement
*/
// Guarded by synchronized (this)
private void antiEntropyCheckLocalRemoved(AntiEntropyAdvertisement<K> ad) {
final NodeId sender = ad.sender();
final List<RemoveEntry<K>> removesToSend = new ArrayList<>();
for (Map.Entry<K, Timestamp> dead : removedItems.entrySet()) {
K key = dead.getKey();
Timestamp localDeadTimestamp = dead.getValue();
Timestamp remoteLiveTimestamp = ad.timestamps().get(key);
if (remoteLiveTimestamp != null
&& localDeadTimestamp.compareTo(remoteLiveTimestamp) > 0) {
// sender has zombie, push remove
removesToSend
.add(new RemoveEntry<>(key, localDeadTimestamp));
}
}
// Send all removes to the peer at once
if (!removesToSend.isEmpty()) {
try {
unicastMessage(sender, removeMessageSubject, new InternalRemoveEvent<>(removesToSend));
} catch (IOException e) {
log.warn("Failed to send advertisement response", e);
}
}
}
/**
* Checks if any of the local live items are out of date according to the
* remote's tombstone advertisements. If we find a local item is out of date,
* we'll apply the remove operation to the local state.
*
* @param ad remote anti-entropy advertisement
* @return list of external events relating to local operations performed
*/
// Guarded by synchronized (this)
private List<EventuallyConsistentMapEvent<K, V>>
antiEntropyCheckRemoteRemoved(AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents
= new LinkedList<>();
for (Map.Entry<K, Timestamp> remoteDead : ad.tombstones().entrySet()) {
K key = remoteDead.getKey();
Timestamp remoteDeadTimestamp = remoteDead.getValue();
Timestamped<V> local = items.get(key);
Timestamp localDead = removedItems.get(key);
if (local != null
&& remoteDeadTimestamp.compareTo(local.timestamp()) > 0) {
// remove our version
if (removeInternal(key, remoteDeadTimestamp)) {
externalEvents.add(new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null));
}
} else if (localDead != null &&
remoteDeadTimestamp.compareTo(localDead) > 0) {
// If we both had the item as removed, but their timestamp is
// newer, update ours to the newer value
removedItems.put(key, remoteDeadTimestamp);
}
}
return externalEvents;
}
private final class InternalAntiEntropyListener
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("Received anti-entropy advertisement from peer: {}", message.sender());
AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
backgroundExecutor.submit(() -> {
try {
handleAntiEntropyAdvertisement(advertisement);
} catch (Exception e) {
log.warn("Exception thrown handling advertisements", e);
}
});
}
}
private final class InternalPutEventListener implements
ClusterMessageHandler {
@Override
......@@ -433,7 +686,7 @@ public class EventuallyConsistentMapImpl<K, V>
Timestamp timestamp = entry.timestamp();
if (putInternal(key, value, timestamp)) {
EventuallyConsistentMapEvent externalEvent =
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key,
value);
......@@ -461,7 +714,8 @@ public class EventuallyConsistentMapImpl<K, V>
Timestamp timestamp = entry.timestamp();
if (removeInternal(key, timestamp)) {
EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
EventuallyConsistentMapEvent<K, V> externalEvent
= new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE,
key, null);
notifyListeners(externalEvent);
......