Madan Jampani
Committed by Gerrit Code Review

ONOS-2068 Refresh leaderboard from source every 2s

Change-Id: I99a6bc6a7bada6147abcab005d86d74204704c21
...@@ -89,7 +89,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -89,7 +89,7 @@ public class DistributedLeadershipManager implements LeadershipService {
89 private ScheduledExecutorService electionRunner; 89 private ScheduledExecutorService electionRunner;
90 private ScheduledExecutorService lockExecutor; 90 private ScheduledExecutorService lockExecutor;
91 private ScheduledExecutorService staleLeadershipPurgeExecutor; 91 private ScheduledExecutorService staleLeadershipPurgeExecutor;
92 - private ScheduledExecutorService leadershipStatusBroadcaster; 92 + private ScheduledExecutorService leadershipRefresher;
93 93
94 private ConsistentMap<String, NodeId> leaderMap; 94 private ConsistentMap<String, NodeId> leaderMap;
95 private ConsistentMap<String, List<NodeId>> candidateMap; 95 private ConsistentMap<String, List<NodeId>> candidateMap;
...@@ -106,7 +106,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -106,7 +106,7 @@ public class DistributedLeadershipManager implements LeadershipService {
106 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS) 106 // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
107 private static final int WAIT_BEFORE_RETRY_MILLIS = 150; 107 private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
108 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2; 108 private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
109 - private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2; 109 + private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
110 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2; 110 private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
111 111
112 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false); 112 private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
...@@ -135,8 +135,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -135,8 +135,8 @@ public class DistributedLeadershipManager implements LeadershipService {
135 4, groupedThreads("onos/store/leadership", "election-thread-%d")); 135 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
136 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor( 136 staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
137 groupedThreads("onos/store/leadership", "stale-leadership-evictor")); 137 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
138 - leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor( 138 + leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
139 - groupedThreads("onos/store/leadership", "peer-updater")); 139 + groupedThreads("onos/store/leadership", "refresh-thread"));
140 clusterCommunicator.addSubscriber( 140 clusterCommunicator.addSubscriber(
141 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 141 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
142 SERIALIZER::decode, 142 SERIALIZER::decode,
...@@ -148,8 +148,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -148,8 +148,8 @@ public class DistributedLeadershipManager implements LeadershipService {
148 electionRunner.scheduleWithFixedDelay( 148 electionRunner.scheduleWithFixedDelay(
149 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS); 149 this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
150 150
151 - leadershipStatusBroadcaster.scheduleWithFixedDelay( 151 + leadershipRefresher.scheduleWithFixedDelay(
152 - this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS); 152 + this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
153 153
154 listenerRegistry = new ListenerRegistry<>(); 154 listenerRegistry = new ListenerRegistry<>();
155 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); 155 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
...@@ -173,7 +173,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -173,7 +173,7 @@ public class DistributedLeadershipManager implements LeadershipService {
173 messageHandlingExecutor.shutdown(); 173 messageHandlingExecutor.shutdown();
174 lockExecutor.shutdown(); 174 lockExecutor.shutdown();
175 staleLeadershipPurgeExecutor.shutdown(); 175 staleLeadershipPurgeExecutor.shutdown();
176 - leadershipStatusBroadcaster.shutdown(); 176 + leadershipRefresher.shutdown();
177 177
178 log.info("Stopped"); 178 log.info("Stopped");
179 } 179 }
...@@ -458,6 +458,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -458,6 +458,7 @@ public class DistributedLeadershipManager implements LeadershipService {
458 leaderBoard.compute(topic, (k, currentLeadership) -> { 458 leaderBoard.compute(topic, (k, currentLeadership) -> {
459 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) { 459 if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
460 updateAccepted.set(true); 460 updateAccepted.set(true);
461 + // FIXME: Removing entries from leaderboard is not safe and should be visited.
461 return null; 462 return null;
462 } 463 }
463 return currentLeadership; 464 return currentLeadership;
...@@ -579,18 +580,36 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -579,18 +580,36 @@ public class DistributedLeadershipManager implements LeadershipService {
579 } 580 }
580 } 581 }
581 582
582 - private void sendLeadershipStatus() { 583 + private void refreshLeaderBoard() {
583 try { 584 try {
584 - leaderBoard.forEach((path, leadership) -> { 585 + Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
585 - if (leadership.leader().equals(localNodeId)) { 586 + leaderMap.entrySet().forEach(entry -> {
586 - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership); 587 + String path = entry.getKey();
587 - clusterCommunicator.broadcast(event, 588 + Versioned<NodeId> leader = entry.getValue();
588 - LEADERSHIP_EVENT_MESSAGE_SUBJECT, 589 + Leadership leadership = new Leadership(path,
589 - SERIALIZER::encode); 590 + leader.value(),
590 - } 591 + leader.version(),
592 + leader.creationTime());
593 + newLeaderBoard.put(path, leadership);
594 + });
595 +
596 + // first take snapshot of current leader board.
597 + Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
598 +
599 + // evict stale leaders
600 + Maps.difference(currentLeaderBoard, newLeaderBoard).entriesOnlyOnLeft().forEach((path, leadership) -> {
601 + log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
602 + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
603 + });
604 +
605 + // add missing leaders
606 + Maps.difference(currentLeaderBoard, newLeaderBoard).entriesDiffering().forEach((path, difference) -> {
607 + Leadership leadership = difference.rightValue();
608 + log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
609 + onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
591 }); 610 });
592 } catch (Exception e) { 611 } catch (Exception e) {
593 - log.debug("Failed to send leadership updates", e); 612 + log.debug("Failed to refresh leader board", e);
594 } 613 }
595 } 614 }
596 615
......