Madan Jampani

ONOS-2322: Support for periodic purging of ECMap tombstones

Change-Id: I6fe5475a472c383c4a51bd61446fba8f1dba1d37
...@@ -17,10 +17,10 @@ package org.onosproject.store.primitives.impl; ...@@ -17,10 +17,10 @@ package org.onosproject.store.primitives.impl;
17 17
18 import com.google.common.base.MoreObjects; 18 import com.google.common.base.MoreObjects;
19 import com.google.common.collect.ImmutableMap; 19 import com.google.common.collect.ImmutableMap;
20 -
21 import org.onosproject.cluster.NodeId; 20 import org.onosproject.cluster.NodeId;
22 21
23 import java.util.Map; 22 import java.util.Map;
23 +
24 import static com.google.common.base.Preconditions.checkNotNull; 24 import static com.google.common.base.Preconditions.checkNotNull;
25 25
26 /** 26 /**
...@@ -28,6 +28,7 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -28,6 +28,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
28 */ 28 */
29 public class AntiEntropyAdvertisement<K> { 29 public class AntiEntropyAdvertisement<K> {
30 30
31 + private final long creationTime;
31 private final NodeId sender; 32 private final NodeId sender;
32 private final Map<K, MapValue.Digest> digest; 33 private final Map<K, MapValue.Digest> digest;
33 34
...@@ -39,11 +40,21 @@ public class AntiEntropyAdvertisement<K> { ...@@ -39,11 +40,21 @@ public class AntiEntropyAdvertisement<K> {
39 */ 40 */
40 public AntiEntropyAdvertisement(NodeId sender, 41 public AntiEntropyAdvertisement(NodeId sender,
41 Map<K, MapValue.Digest> digest) { 42 Map<K, MapValue.Digest> digest) {
43 + this.creationTime = System.currentTimeMillis();
42 this.sender = checkNotNull(sender); 44 this.sender = checkNotNull(sender);
43 this.digest = ImmutableMap.copyOf(checkNotNull(digest)); 45 this.digest = ImmutableMap.copyOf(checkNotNull(digest));
44 } 46 }
45 47
46 /** 48 /**
49 + * Returns the ad creation time.
50 + *
51 + * @return ad creation time
52 + */
53 + public long creationTime() {
54 + return creationTime;
55 + }
56 +
57 + /**
47 * Returns the sender's node ID. 58 * Returns the sender's node ID.
48 * 59 *
49 * @return the sender's node ID 60 * @return the sender's node ID
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +/**
19 + * Status of anti-entropy exchange, returned by the receiver.
20 + *
21 + */
22 +public enum AntiEntropyResponse {
23 + /**
24 + * Signifies a successfully processed anti-entropy message.
25 + */
26 + PROCESSED,
27 +
28 + /**
29 + * Signifies a unexpected failure during anti-entropy message processing.
30 + */
31 + FAILED,
32 +
33 + /**
34 + * Signifies a ignored anti-entropy message, potentially due to the receiver operating under high load.
35 + */
36 + IGNORED
37 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -102,6 +102,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -102,6 +102,9 @@ public class EventuallyConsistentMapImpl<K, V>
102 private final ExecutorService communicationExecutor; 102 private final ExecutorService communicationExecutor;
103 private final Map<NodeId, EventAccumulator> senderPending; 103 private final Map<NodeId, EventAccumulator> senderPending;
104 104
105 + private long previousTombstonePurgeTime;
106 + private final Map<NodeId, Long> antiEntropyTimes = Maps.newConcurrentMap();
107 +
105 private final String mapName; 108 private final String mapName;
106 109
107 private volatile boolean destroyed = false; 110 private volatile boolean destroyed = false;
...@@ -250,8 +253,15 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -250,8 +253,15 @@ public class EventuallyConsistentMapImpl<K, V>
250 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject, 253 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
251 serializer::decode, 254 serializer::decode,
252 this::handleAntiEntropyAdvertisement, 255 this::handleAntiEntropyAdvertisement,
256 + serializer::encode,
253 this.backgroundExecutor); 257 this.backgroundExecutor);
254 258
259 + previousTombstonePurgeTime = 0;
260 + this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones,
261 + initialDelaySec,
262 + antiEntropyPeriod,
263 + TimeUnit.SECONDS);
264 +
255 this.tombstonesDisabled = tombstonesDisabled; 265 this.tombstonesDisabled = tombstonesDisabled;
256 this.lightweightAntiEntropy = !convergeFaster; 266 this.lightweightAntiEntropy = !convergeFaster;
257 } 267 }
...@@ -267,6 +277,7 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -267,6 +277,7 @@ public class EventuallyConsistentMapImpl<K, V>
267 .register(LogicalTimestamp.class) 277 .register(LogicalTimestamp.class)
268 .register(WallClockTimestamp.class) 278 .register(WallClockTimestamp.class)
269 .register(AntiEntropyAdvertisement.class) 279 .register(AntiEntropyAdvertisement.class)
280 + .register(AntiEntropyResponse.class)
270 .register(UpdateEntry.class) 281 .register(UpdateEntry.class)
271 .register(MapValue.class) 282 .register(MapValue.class)
272 .register(MapValue.Digest.class) 283 .register(MapValue.Digest.class)
...@@ -563,13 +574,17 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -563,13 +574,17 @@ public class EventuallyConsistentMapImpl<K, V>
563 } 574 }
564 575
565 private void sendAdvertisementToPeer(NodeId peer) { 576 private void sendAdvertisementToPeer(NodeId peer) {
566 - clusterCommunicator.unicast(createAdvertisement(), 577 + AntiEntropyAdvertisement<K> ad = createAdvertisement();
578 + clusterCommunicator.sendAndReceive(ad,
567 antiEntropyAdvertisementSubject, 579 antiEntropyAdvertisementSubject,
568 serializer::encode, 580 serializer::encode,
581 + serializer::decode,
569 peer) 582 peer)
570 .whenComplete((result, error) -> { 583 .whenComplete((result, error) -> {
571 if (error != null) { 584 if (error != null) {
572 log.debug("Failed to send anti-entropy advertisement to {}", peer, error); 585 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
586 + } else if (result == AntiEntropyResponse.PROCESSED) {
587 + antiEntropyTimes.put(peer, ad.creationTime());
573 } 588 }
574 }); 589 });
575 } 590 }
...@@ -579,9 +594,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -579,9 +594,9 @@ public class EventuallyConsistentMapImpl<K, V>
579 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest))); 594 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
580 } 595 }
581 596
582 - private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) { 597 + private AntiEntropyResponse handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
583 if (destroyed || underHighLoad()) { 598 if (destroyed || underHighLoad()) {
584 - return; 599 + return AntiEntropyResponse.IGNORED;
585 } 600 }
586 try { 601 try {
587 if (log.isTraceEnabled()) { 602 if (log.isTraceEnabled()) {
...@@ -600,7 +615,9 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -600,7 +615,9 @@ public class EventuallyConsistentMapImpl<K, V>
600 } 615 }
601 } catch (Exception e) { 616 } catch (Exception e) {
602 log.warn("Error handling anti-entropy advertisement", e); 617 log.warn("Error handling anti-entropy advertisement", e);
618 + return AntiEntropyResponse.FAILED;
603 } 619 }
620 + return AntiEntropyResponse.PROCESSED;
604 } 621 }
605 622
606 /** 623 /**
...@@ -634,13 +651,37 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -634,13 +651,37 @@ public class EventuallyConsistentMapImpl<K, V>
634 return externalEvents; 651 return externalEvents;
635 } 652 }
636 653
654 + private void purgeTombstones() {
655 + /*
656 + * In order to mitigate the resource exhausation that can ensue due to an ever-growing set
657 + * of tombstones we employ the following heuristic to purge old tombstones periodically.
658 + * First, we keep track of the time (local system time) when we were able to have a successful
659 + * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded
660 + * as the time before which all tombstones are considered safe to purge.
661 + */
662 + if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) {
663 + return;
664 + }
665 + long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L);
666 + if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) {
667 + return;
668 + }
669 + List<Map.Entry<K, MapValue<V>>> tombStonesToDelete = items.entrySet()
670 + .stream()
671 + .filter(e -> e.getValue().isTombstone())
672 + .filter(e -> e.getValue().creationTime() <= currentSafeTombstonePurgeTime)
673 + .collect(Collectors.toList());
674 + previousTombstonePurgeTime = currentSafeTombstonePurgeTime;
675 + tombStonesToDelete.forEach(entry -> items.remove(entry.getKey(), entry.getValue()));
676 + }
677 +
637 private void processUpdates(Collection<UpdateEntry<K, V>> updates) { 678 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
638 if (destroyed) { 679 if (destroyed) {
639 return; 680 return;
640 } 681 }
641 updates.forEach(update -> { 682 updates.forEach(update -> {
642 final K key = update.key(); 683 final K key = update.key();
643 - final MapValue<V> value = update.value(); 684 + final MapValue<V> value = update.value() == null ? null : update.value().copy();
644 if (value == null || value.isTombstone()) { 685 if (value == null || value.isTombstone()) {
645 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value)); 686 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
646 if (previousValue != null && previousValue.isAlive()) { 687 if (previousValue != null && previousValue.isAlive()) {
......
...@@ -30,6 +30,7 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -30,6 +30,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
30 public class MapValue<V> implements Comparable<MapValue<V>> { 30 public class MapValue<V> implements Comparable<MapValue<V>> {
31 private final Timestamp timestamp; 31 private final Timestamp timestamp;
32 private final V value; 32 private final V value;
33 + private long creationTime;
33 34
34 /** 35 /**
35 * Creates a tombstone value with the specified timestamp. 36 * Creates a tombstone value with the specified timestamp.
...@@ -39,12 +40,35 @@ public class MapValue<V> implements Comparable<MapValue<V>> { ...@@ -39,12 +40,35 @@ public class MapValue<V> implements Comparable<MapValue<V>> {
39 * @param <U> value type 40 * @param <U> value type
40 */ 41 */
41 public static <U> MapValue<U> tombstone(Timestamp timestamp) { 42 public static <U> MapValue<U> tombstone(Timestamp timestamp) {
42 - return new MapValue<>(null, timestamp); 43 + return new MapValue<>(null, timestamp, System.currentTimeMillis());
43 } 44 }
44 45
45 public MapValue(V value, Timestamp timestamp) { 46 public MapValue(V value, Timestamp timestamp) {
47 + this(value, timestamp, System.currentTimeMillis());
48 + }
49 +
50 + /**
51 + * Constructor.
52 + *
53 + * @param value value
54 + * @param timestamp value timestamp.
55 + * @param creationTime the system time (on local instance) of construction
56 + */
57 + public MapValue(V value, Timestamp timestamp, long creationTime) {
46 this.value = value; 58 this.value = value;
47 this.timestamp = checkNotNull(timestamp, "Timestamp cannot be null"); 59 this.timestamp = checkNotNull(timestamp, "Timestamp cannot be null");
60 + this.creationTime = creationTime;
61 + }
62 +
63 + /**
64 + * Creates a copy of MapValue.
65 + * <p>
66 + * The copy will have an updated creation time corresponding to when the copy was constructed.
67 + *
68 + * @return MapValue copy
69 + */
70 + public MapValue<V> copy() {
71 + return new MapValue<>(this.value, this.timestamp, System.currentTimeMillis());
48 } 72 }
49 73
50 public boolean isTombstone() { 74 public boolean isTombstone() {
...@@ -63,6 +87,10 @@ public class MapValue<V> implements Comparable<MapValue<V>> { ...@@ -63,6 +87,10 @@ public class MapValue<V> implements Comparable<MapValue<V>> {
63 return value; 87 return value;
64 } 88 }
65 89
90 + public long creationTime() {
91 + return creationTime;
92 + }
93 +
66 @Override 94 @Override
67 public int compareTo(MapValue<V> o) { 95 public int compareTo(MapValue<V> o) {
68 return this.timestamp.compareTo(o.timestamp); 96 return this.timestamp.compareTo(o.timestamp);
......
...@@ -103,7 +103,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -103,7 +103,7 @@ public class EventuallyConsistentMapImplTest {
103 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1)); 103 new DefaultControllerNode(new NodeId("local"), IpAddress.valueOf(1));
104 104
105 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler; 105 private Consumer<Collection<UpdateEntry<String, String>>> updateHandler;
106 - private Consumer<AntiEntropyAdvertisement<String>> antiEntropyHandler; 106 + private Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse> antiEntropyHandler;
107 107
108 /* 108 /*
109 * Serialization is a bit tricky here. We need to serialize in the tests 109 * Serialization is a bit tricky here. We need to serialize in the tests
...@@ -144,9 +144,15 @@ public class EventuallyConsistentMapImplTest { ...@@ -144,9 +144,15 @@ public class EventuallyConsistentMapImplTest {
144 // delegate to our ClusterCommunicationService implementation. This 144 // delegate to our ClusterCommunicationService implementation. This
145 // allows us to get a reference to the map's internal cluster message 145 // allows us to get a reference to the map's internal cluster message
146 // handlers so we can induce events coming in from a peer. 146 // handlers so we can induce events coming in from a peer.
147 - clusterCommunicator.<String>addSubscriber(anyObject(MessageSubject.class), 147 + clusterCommunicator.<Object>addSubscriber(anyObject(MessageSubject.class),
148 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class)); 148 anyObject(Function.class), anyObject(Consumer.class), anyObject(Executor.class));
149 - expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(2); 149 + expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
150 + clusterCommunicator.<Object, Object>addSubscriber(anyObject(MessageSubject.class),
151 + anyObject(Function.class),
152 + anyObject(Function.class),
153 + anyObject(Function.class),
154 + anyObject(Executor.class));
155 + expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(1);
150 156
151 replay(clusterCommunicator); 157 replay(clusterCommunicator);
152 158
...@@ -798,8 +804,16 @@ public class EventuallyConsistentMapImplTest { ...@@ -798,8 +804,16 @@ public class EventuallyConsistentMapImplTest {
798 Executor executor) { 804 Executor executor) {
799 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) { 805 if (subject.equals(UPDATE_MESSAGE_SUBJECT)) {
800 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler; 806 updateHandler = (Consumer<Collection<UpdateEntry<String, String>>>) handler;
801 - } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) { 807 + } else {
802 - antiEntropyHandler = (Consumer<AntiEntropyAdvertisement<String>>) handler; 808 + throw new RuntimeException("Unexpected message subject " + subject.toString());
809 + }
810 + }
811 +
812 + @Override
813 + public <M, R> void addSubscriber(MessageSubject subject,
814 + Function<byte[], M> decoder, Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
815 + if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
816 + antiEntropyHandler = (Function<AntiEntropyAdvertisement<String>, AntiEntropyResponse>) handler;
803 } else { 817 } else {
804 throw new RuntimeException("Unexpected message subject " + subject.toString()); 818 throw new RuntimeException("Unexpected message subject " + subject.toString());
805 } 819 }
......