Jon Hall
Committed by Gerrit Code Review

[ONOS-3591] Anti-Entropy speed up via push/pull interaction

Adds an UpdateRequest message. This contains a set of keys that a node
is missing updates for. The receiver will then send an UpdateEntry for
each missing key to the requester.

Change-Id: I2115f4a05833b51ae14d1191f09f083b5251f8ec
......@@ -25,6 +25,7 @@ import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.RE
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
......@@ -90,6 +91,7 @@ public class EventuallyConsistentMapImpl<K, V>
private final MessageSubject updateMessageSubject;
private final MessageSubject antiEntropyAdvertisementSubject;
private final MessageSubject updateRequestSubject;
private final Set<EventuallyConsistentMapListener<K, V>> listeners
= Sets.newCopyOnWriteArraySet();
......@@ -244,6 +246,12 @@ public class EventuallyConsistentMapImpl<K, V>
serializer::encode,
this.backgroundExecutor);
updateRequestSubject = new MessageSubject("ecm-" + mapName + "-update-request");
clusterCommunicator.addSubscriber(updateRequestSubject,
serializer::decode,
this::handleUpdateRequests,
this.backgroundExecutor);
if (!tombstonesDisabled) {
previousTombstonePurgeTime = 0;
this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
......@@ -513,6 +521,7 @@ public class EventuallyConsistentMapImpl<K, V>
listeners.clear();
clusterCommunicator.removeSubscriber(updateMessageSubject);
clusterCommunicator.removeSubscriber(updateRequestSubject);
clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
return CompletableFuture.completedFuture(null);
}
......@@ -579,6 +588,19 @@ public class EventuallyConsistentMapImpl<K, V>
});
}
private void sendUpdateRequestToPeer(NodeId peer, Set<K> keys) {
UpdateRequest<K> request = new UpdateRequest<>(localNodeId, keys);
clusterCommunicator.unicast(request,
updateRequestSubject,
serializer::encode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send update request to {}", peer, error);
}
});
}
private AntiEntropyAdvertisement<K> createAdvertisement() {
return new AntiEntropyAdvertisement<>(localNodeId,
ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
......@@ -591,18 +613,9 @@ public class EventuallyConsistentMapImpl<K, V>
try {
if (log.isTraceEnabled()) {
log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
mapName, ad.sender(), ad.digest().size());
ad.sender(), mapName, ad.digest().size());
}
antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
if (!lightweightAntiEntropy) {
// if remote ad has any entries that the local copy is missing, actively sync
// TODO: Missing keys is not the way local copy can be behind.
if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
// TODO: Send ad for missing keys and for entries that are stale
sendAdvertisementToPeer(ad.sender());
}
}
} catch (Exception e) {
log.warn("Error handling anti-entropy advertisement", e);
return AntiEntropyResponse.FAILED;
......@@ -620,15 +633,20 @@ public class EventuallyConsistentMapImpl<K, V>
AntiEntropyAdvertisement<K> ad) {
final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
final NodeId sender = ad.sender();
final List<NodeId> peers = ImmutableList.of(sender);
Set<K> staleOrMissing = new HashSet<>();
Set<K> locallyUnknown = new HashSet<>(ad.digest().keySet());
items.forEach((key, localValue) -> {
locallyUnknown.remove(key);
MapValue.Digest remoteValueDigest = ad.digest().get(key);
if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
// local value is more recent, push to sender
queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
}
if (remoteValueDigest != null
queueUpdate(new UpdateEntry<>(key, localValue), peers);
} else if (remoteValueDigest != null
&& remoteValueDigest.isNewerThan(localValue.digest())
&& remoteValueDigest.isTombstone()) {
// remote value is more recent and a tombstone: update local value
MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
MapValue<V> previousValue = removeInternal(key,
Optional.empty(),
......@@ -636,14 +654,31 @@ public class EventuallyConsistentMapImpl<K, V>
if (previousValue != null && previousValue.isAlive()) {
externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
}
} else if (remoteValueDigest.isNewerThan(localValue.digest())) {
// Not a tombstone and remote is newer
staleOrMissing.add(key);
}
});
// Keys missing in local map
staleOrMissing.addAll(locallyUnknown);
// Request updates that we missed out on
sendUpdateRequestToPeer(sender, staleOrMissing);
return externalEvents;
}
private void handleUpdateRequests(UpdateRequest<K> request) {
final Set<K> keys = request.keys();
final NodeId sender = request.sender();
final List<NodeId> peers = ImmutableList.of(sender);
keys.forEach(key ->
queueUpdate(new UpdateEntry<>(key, items.get(key)), peers)
);
}
private void purgeTombstones() {
/*
* In order to mitigate the resource exhausation that can ensue due to an ever-growing set
* In order to mitigate the resource exhaustion that can ensue due to an ever-growing set
* of tombstones we employ the following heuristic to purge old tombstones periodically.
* First, we keep track of the time (local system time) when we were able to have a successful
* AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import org.onosproject.cluster.NodeId;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Describes a request for update events in an EventuallyConsistentMap.
*/
final class UpdateRequest<K> {
private final NodeId sender;
private final Set<K> keys;
/**
* Creates a new update request.
*
* @param sender the sender's node ID
* @param keys keys requested
*/
public UpdateRequest(NodeId sender, Set<K> keys) {
this.sender = checkNotNull(sender);
this.keys = ImmutableSet.copyOf(keys);
}
/**
* Returns the sender's node ID.
*
* @return the sender's node ID
*/
public NodeId sender() {
return sender;
}
/**
* Returns the keys.
*
* @return the keys
*/
public Set<K> keys() {
return keys;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("sender", sender)
.add("keys", keys())
.toString();
}
@SuppressWarnings("unused")
private UpdateRequest() {
this.sender = null;
this.keys = null;
}
}
......@@ -88,6 +88,8 @@ public class EventuallyConsistentMapImplTest {
= new MessageSubject("ecm-" + MAP_NAME + "-update");
private static final MessageSubject ANTI_ENTROPY_MESSAGE_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-anti-entropy");
private static final MessageSubject UPDATE_REQUEST_SUBJECT
= new MessageSubject("ecm-" + MAP_NAME + "-update-request");
private static final String KEY1 = "one";
private static final String KEY2 = "two";
......@@ -98,6 +100,7 @@ public class EventuallyConsistentMapImplTest {
new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
private Consumer<Collection<UpdateRequest<String>>> requestHandler;
private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
@Before
......@@ -123,6 +126,9 @@ public class EventuallyConsistentMapImplTest {
anyObject(Function.class),
anyObject(Executor.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
replay(clusterCommunicator);
......@@ -627,6 +633,7 @@ public class EventuallyConsistentMapImplTest {
@Test
public void testDestroy() throws Exception {
clusterCommunicator.removeSubscriber(UPDATE_MESSAGE_SUBJECT);
clusterCommunicator.removeSubscriber(UPDATE_REQUEST_SUBJECT);
clusterCommunicator.removeSubscriber(ANTI_ENTROPY_MESSAGE_SUBJECT);
replay(clusterCommunicator);
......@@ -774,6 +781,8 @@ public class EventuallyConsistentMapImplTest {
Executor executor) {
if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
} else if (subject.equals(UPDATE_REQUEST_SUBJECT)) {
requestHandler = (Consumer<Collection<UpdateRequest<String>>>) handler;
} else {
throw new RuntimeException("Unexpected message subject " + subject.toString());
}
......