Jonathan Hart
Committed by Brian O'Connor

Add a configuration option to vary how heavyweight anti-entropy is.

Change-Id: I57cea61182b3d19deb47608ffb7dd617529ae34c
......@@ -98,6 +98,7 @@ public class EventuallyConsistentMapImpl<K, V>
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
private long periodSec = 5;
private boolean lightweightAntiEntropy = true;
/**
* Creates a new eventually consistent map shared amongst multiple instances.
......@@ -567,35 +568,32 @@ public class EventuallyConsistentMapImpl<K, V>
private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
List<EventuallyConsistentMapEvent<K, V>> externalEvents;
boolean sync = false;
externalEvents = antiEntropyCheckLocalItems(ad);
antiEntropyCheckLocalRemoved(ad);
externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
// if remote ad has something unknown, actively sync
for (K key : ad.timestamps().keySet()) {
if (!items.containsKey(key)) {
sync = true;
break;
}
}
if (!lightweightAntiEntropy) {
externalEvents.addAll(antiEntropyCheckRemoteRemoved(ad));
// if remote ad has something unknown, actively sync
for (K key : ad.timestamps().keySet()) {
if (!items.containsKey(key)) {
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
try {
unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
} catch (IOException e) {
log.debug(
"Failed to send reactive anti-entropy advertisement to {}",
sender);
}
// Send the advertisement back if this peer is out-of-sync
if (sync) {
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
try {
unicastMessage(sender, antiEntropyAdvertisementSubject, myAd);
} catch (IOException e) {
log.debug(
"Failed to send reactive anti-entropy advertisement to {}",
sender);
break;
}
}
}
externalEvents.forEach(this::notifyListeners);
}
......