Committed by
Gerrit Code Review
Ensure tombsone purge logic works correctly after a cluster scale down
Change-Id: I94a4c234982a9e8f44af5078b3cbcee13e4b93cb
Showing
1 changed file
with
14 additions
and
9 deletions
| ... | @@ -256,11 +256,13 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -256,11 +256,13 @@ public class EventuallyConsistentMapImpl<K, V> |
| 256 | serializer::encode, | 256 | serializer::encode, |
| 257 | this.backgroundExecutor); | 257 | this.backgroundExecutor); |
| 258 | 258 | ||
| 259 | - previousTombstonePurgeTime = 0; | 259 | + if (!tombstonesDisabled) { |
| 260 | - this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones, | 260 | + previousTombstonePurgeTime = 0; |
| 261 | - initialDelaySec, | 261 | + this.backgroundExecutor.scheduleWithFixedDelay(this::purgeTombstones, |
| 262 | - antiEntropyPeriod, | 262 | + initialDelaySec, |
| 263 | - TimeUnit.SECONDS); | 263 | + antiEntropyPeriod, |
| 264 | + TimeUnit.SECONDS); | ||
| 265 | + } | ||
| 264 | 266 | ||
| 265 | this.tombstonesDisabled = tombstonesDisabled; | 267 | this.tombstonesDisabled = tombstonesDisabled; |
| 266 | this.lightweightAntiEntropy = !convergeFaster; | 268 | this.lightweightAntiEntropy = !convergeFaster; |
| ... | @@ -659,10 +661,13 @@ public class EventuallyConsistentMapImpl<K, V> | ... | @@ -659,10 +661,13 @@ public class EventuallyConsistentMapImpl<K, V> |
| 659 | * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded | 661 | * AE exchange with each peer. The smallest (or oldest) such time across *all* peers is regarded |
| 660 | * as the time before which all tombstones are considered safe to purge. | 662 | * as the time before which all tombstones are considered safe to purge. |
| 661 | */ | 663 | */ |
| 662 | - if (tombstonesDisabled || antiEntropyTimes.size() != clusterService.getNodes().size() - 1) { | 664 | + long currentSafeTombstonePurgeTime = clusterService.getNodes() |
| 663 | - return; | 665 | + .stream() |
| 664 | - } | 666 | + .map(ControllerNode::id) |
| 665 | - long currentSafeTombstonePurgeTime = antiEntropyTimes.values().stream().reduce(Math::min).orElse(0L); | 667 | + .filter(id -> !id.equals(localNodeId)) |
| 668 | + .map(id -> antiEntropyTimes.getOrDefault(id, 0L)) | ||
| 669 | + .reduce(Math::min) | ||
| 670 | + .orElse(0L); | ||
| 666 | if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) { | 671 | if (currentSafeTombstonePurgeTime == previousTombstonePurgeTime) { |
| 667 | return; | 672 | return; |
| 668 | } | 673 | } | ... | ... |
-
Please register or login to post a comment