Madan Jampani

ONOS-1965: Evict inactive nodes from candidates map + Rely on cluster events to …

…trigger stale lock purge

Change-Id: Ib7cfea397f98d6271beb78e3b88041bb84550506
...@@ -13,6 +13,9 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -13,6 +13,9 @@ import org.apache.felix.scr.annotations.Reference;
13 import org.apache.felix.scr.annotations.ReferenceCardinality; 13 import org.apache.felix.scr.annotations.ReferenceCardinality;
14 import org.apache.felix.scr.annotations.Service; 14 import org.apache.felix.scr.annotations.Service;
15 import org.onlab.util.KryoNamespace; 15 import org.onlab.util.KryoNamespace;
16 +import org.onosproject.cluster.ClusterEvent;
17 +import org.onosproject.cluster.ClusterEvent.Type;
18 +import org.onosproject.cluster.ClusterEventListener;
16 import org.onosproject.cluster.ClusterService; 19 import org.onosproject.cluster.ClusterService;
17 import org.onosproject.cluster.Leadership; 20 import org.onosproject.cluster.Leadership;
18 import org.onosproject.cluster.LeadershipEvent; 21 import org.onosproject.cluster.LeadershipEvent;
...@@ -32,6 +35,7 @@ import org.onosproject.store.service.Versioned; ...@@ -32,6 +35,7 @@ import org.onosproject.store.service.Versioned;
32 import org.slf4j.Logger; 35 import org.slf4j.Logger;
33 36
34 import java.util.ArrayList; 37 import java.util.ArrayList;
38 +import java.util.Collections;
35 import java.util.Map; 39 import java.util.Map;
36 import java.util.Map.Entry; 40 import java.util.Map.Entry;
37 import java.util.Objects; 41 import java.util.Objects;
...@@ -55,7 +59,7 @@ import static org.onosproject.cluster.ControllerNode.State.INACTIVE; ...@@ -55,7 +59,7 @@ import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
55 /** 59 /**
56 * Distributed Lock Manager implemented on top of ConsistentMap. 60 * Distributed Lock Manager implemented on top of ConsistentMap.
57 * <p> 61 * <p>
58 - * This implementation makes use of cluster manager's failure 62 + * This implementation makes use of ClusterService's failure
59 * detection capabilities to detect and purge stale locks. 63 * detection capabilities to detect and purge stale locks.
60 * TODO: Ensure lock safety and liveness. 64 * TODO: Ensure lock safety and liveness.
61 */ 65 */
...@@ -81,27 +85,28 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -81,27 +85,28 @@ public class DistributedLeadershipManager implements LeadershipService {
81 private final Logger log = getLogger(getClass()); 85 private final Logger log = getLogger(getClass());
82 private ExecutorService messageHandlingExecutor; 86 private ExecutorService messageHandlingExecutor;
83 private ScheduledExecutorService retryLeaderLockExecutor; 87 private ScheduledExecutorService retryLeaderLockExecutor;
84 - private ScheduledExecutorService deadLockDetectionExecutor; 88 + private ScheduledExecutorService staleLeadershipPurgeExecutor;
85 private ScheduledExecutorService leadershipStatusBroadcaster; 89 private ScheduledExecutorService leadershipStatusBroadcaster;
86 90
87 private ConsistentMap<String, NodeId> leaderMap; 91 private ConsistentMap<String, NodeId> leaderMap;
88 private ConsistentMap<String, List<NodeId>> candidateMap; 92 private ConsistentMap<String, List<NodeId>> candidateMap;
89 93
90 - private ListenerRegistry<LeadershipEvent, LeadershipEventListener> 94 + private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
91 - listenerRegistry;
92 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap(); 95 private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
93 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap(); 96 private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
94 - private NodeId localNodeId; 97 + private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
95 98
99 + private NodeId localNodeId;
96 private Set<String> activeTopics = Sets.newConcurrentHashSet(); 100 private Set<String> activeTopics = Sets.newConcurrentHashSet();
97 101
98 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2; 102 private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
99 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; 103 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
100 - private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
101 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2; 104 private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
102 - 105 + private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
103 private static final int LEADER_CANDIDATE_POS = 0; 106 private static final int LEADER_CANDIDATE_POS = 0;
104 107
108 + private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
109 +
105 private static final Serializer SERIALIZER = Serializer.using( 110 private static final Serializer SERIALIZER = Serializer.using(
106 new KryoNamespace.Builder().register(KryoNamespaces.API).build()); 111 new KryoNamespace.Builder().register(KryoNamespaces.API).build());
107 112
...@@ -122,8 +127,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -122,8 +127,8 @@ public class DistributedLeadershipManager implements LeadershipService {
122 groupedThreads("onos/store/leadership", "message-handler")); 127 groupedThreads("onos/store/leadership", "message-handler"));
123 retryLeaderLockExecutor = Executors.newScheduledThreadPool( 128 retryLeaderLockExecutor = Executors.newScheduledThreadPool(
124 4, groupedThreads("onos/store/leadership", "election-thread-%d")); 129 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
125 - deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor( 130 + staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
126 - groupedThreads("onos/store/leadership", "dead-lock-detector")); 131 + groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
127 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor( 132 leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
128 groupedThreads("onos/store/leadership", "peer-updater")); 133 groupedThreads("onos/store/leadership", "peer-updater"));
129 clusterCommunicator.addSubscriber( 134 clusterCommunicator.addSubscriber(
...@@ -132,8 +137,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -132,8 +137,8 @@ public class DistributedLeadershipManager implements LeadershipService {
132 this::onLeadershipEvent, 137 this::onLeadershipEvent,
133 messageHandlingExecutor); 138 messageHandlingExecutor);
134 139
135 - deadLockDetectionExecutor.scheduleWithFixedDelay( 140 + clusterService.addListener(clusterEventListener);
136 - this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS); 141 +
137 leadershipStatusBroadcaster.scheduleWithFixedDelay( 142 leadershipStatusBroadcaster.scheduleWithFixedDelay(
138 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS); 143 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
139 144
...@@ -151,12 +156,13 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -151,12 +156,13 @@ public class DistributedLeadershipManager implements LeadershipService {
151 } 156 }
152 }); 157 });
153 158
159 + clusterService.removeListener(clusterEventListener);
154 eventDispatcher.removeSink(LeadershipEvent.class); 160 eventDispatcher.removeSink(LeadershipEvent.class);
155 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT); 161 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
156 162
157 messageHandlingExecutor.shutdown(); 163 messageHandlingExecutor.shutdown();
158 retryLeaderLockExecutor.shutdown(); 164 retryLeaderLockExecutor.shutdown();
159 - deadLockDetectionExecutor.shutdown(); 165 + staleLeadershipPurgeExecutor.shutdown();
160 leadershipStatusBroadcaster.shutdown(); 166 leadershipStatusBroadcaster.shutdown();
161 167
162 log.info("Stopped"); 168 log.info("Stopped");
...@@ -508,12 +514,25 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -508,12 +514,25 @@ public class DistributedLeadershipManager implements LeadershipService {
508 TimeUnit.SECONDS); 514 TimeUnit.SECONDS);
509 } 515 }
510 516
511 - private void purgeStaleLocks() { 517 + private void scheduleStaleLeadershipPurge(int afterDelaySec) {
518 + if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
519 + staleLeadershipPurgeExecutor.schedule(
520 + this::purgeStaleLeadership,
521 + afterDelaySec,
522 + TimeUnit.SECONDS);
523 + }
524 + }
525 +
526 + /**
527 + * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
528 + */
529 + private void purgeStaleLeadership() {
530 + AtomicBoolean rerunPurge = new AtomicBoolean(false);
512 try { 531 try {
532 + staleLeadershipPurgeScheduled.set(false);
513 leaderMap.entrySet() 533 leaderMap.entrySet()
514 .stream() 534 .stream()
515 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE) 535 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
516 - .filter(e -> activeTopics.contains(e.getKey()))
517 .forEach(entry -> { 536 .forEach(entry -> {
518 String path = entry.getKey(); 537 String path = entry.getKey();
519 NodeId nodeId = entry.getValue().value(); 538 NodeId nodeId = entry.getValue().value();
...@@ -528,10 +547,52 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -528,10 +547,52 @@ public class DistributedLeadershipManager implements LeadershipService {
528 } 547 }
529 } catch (Exception e) { 548 } catch (Exception e) {
530 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e); 549 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
550 + rerunPurge.set(true);
551 + }
552 + });
553 +
554 + candidateMap.entrySet()
555 + .forEach(entry -> {
556 + String path = entry.getKey();
557 + Versioned<List<NodeId>> candidates = entry.getValue();
558 + List<NodeId> candidatesList = candidates != null
559 + ? candidates.value() : Collections.emptyList();
560 + List<NodeId> activeCandidatesList =
561 + candidatesList.stream()
562 + .filter(n -> clusterService.getState(n) == ACTIVE)
563 + .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
564 + .collect(Collectors.toList());
565 + if (activeCandidatesList.size() < candidatesList.size()) {
566 + Set<NodeId> removedCandidates =
567 + Sets.difference(Sets.newHashSet(candidatesList),
568 + Sets.newHashSet(activeCandidatesList));
569 + try {
570 + if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
571 + log.info("Evicted inactive candidates {} from "
572 + + "candidate list for {}", removedCandidates, path);
573 + Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
574 + publish(new LeadershipEvent(
575 + LeadershipEvent.Type.CANDIDATES_CHANGED,
576 + new Leadership(path,
577 + updatedCandidates.value(),
578 + updatedCandidates.version(),
579 + updatedCandidates.creationTime())));
580 + }
581 + } catch (Exception e) {
582 + log.warn("Failed to evict inactive candidates {} from "
583 + + "candidate list for {}", removedCandidates, path, e);
584 + rerunPurge.set(true);
585 + }
531 } 586 }
532 }); 587 });
533 } catch (Exception e) { 588 } catch (Exception e) {
534 - log.debug("Failed cleaning up stale locks", e); 589 + log.warn("Failure purging state leadership.", e);
590 + rerunPurge.set(true);
591 + }
592 +
593 + if (rerunPurge.get()) {
594 + log.info("Rescheduling stale leadership purge due to errors encountered in previous run");
595 + scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
535 } 596 }
536 } 597 }
537 598
...@@ -555,4 +616,14 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -555,4 +616,14 @@ public class DistributedLeadershipManager implements LeadershipService {
555 log.debug("Failed to send leadership updates", e); 616 log.debug("Failed to send leadership updates", e);
556 } 617 }
557 } 618 }
619 +
620 + private class InternalClusterEventListener implements ClusterEventListener {
621 +
622 + @Override
623 + public void event(ClusterEvent event) {
624 + if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
625 + scheduleStaleLeadershipPurge(0);
626 + }
627 + }
628 + }
558 } 629 }
......