Madan Jampani
Committed by Gerrit Code Review

Couple of fixes:

1. Retry leadership lock after a successful stepdown
2. setStandby should adjust the candidates list to ensure another node steps up to become the master.

Change-Id: I8dc5da82c9b8b9e99d4118ec33a63037543927f0
...@@ -12,7 +12,6 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -12,7 +12,6 @@ import org.apache.felix.scr.annotations.Deactivate;
12 import org.apache.felix.scr.annotations.Reference; 12 import org.apache.felix.scr.annotations.Reference;
13 import org.apache.felix.scr.annotations.ReferenceCardinality; 13 import org.apache.felix.scr.annotations.ReferenceCardinality;
14 import org.apache.felix.scr.annotations.Service; 14 import org.apache.felix.scr.annotations.Service;
15 -import org.apache.commons.lang3.mutable.MutableBoolean;
16 import org.onlab.util.KryoNamespace; 15 import org.onlab.util.KryoNamespace;
17 import org.onosproject.cluster.ClusterService; 16 import org.onosproject.cluster.ClusterService;
18 import org.onosproject.cluster.Leadership; 17 import org.onosproject.cluster.Leadership;
...@@ -23,8 +22,6 @@ import org.onosproject.cluster.NodeId; ...@@ -23,8 +22,6 @@ import org.onosproject.cluster.NodeId;
23 import org.onosproject.event.AbstractListenerRegistry; 22 import org.onosproject.event.AbstractListenerRegistry;
24 import org.onosproject.event.EventDeliveryService; 23 import org.onosproject.event.EventDeliveryService;
25 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 24 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
26 -import org.onosproject.store.cluster.messaging.ClusterMessage;
27 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
28 import org.onosproject.store.cluster.messaging.MessageSubject; 25 import org.onosproject.store.cluster.messaging.MessageSubject;
29 import org.onosproject.store.serializers.KryoNamespaces; 26 import org.onosproject.store.serializers.KryoNamespaces;
30 import org.onosproject.store.service.ConsistentMap; 27 import org.onosproject.store.service.ConsistentMap;
...@@ -44,6 +41,7 @@ import java.util.concurrent.ExecutorService; ...@@ -44,6 +41,7 @@ import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors; 41 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService; 42 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.TimeUnit; 43 import java.util.concurrent.TimeUnit;
44 +import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.stream.Collectors; 45 import java.util.stream.Collectors;
48 46
49 import static com.google.common.base.Preconditions.checkArgument; 47 import static com.google.common.base.Preconditions.checkArgument;
...@@ -128,7 +126,8 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -128,7 +126,8 @@ public class DistributedLeadershipManager implements LeadershipService {
128 groupedThreads("onos/store/leadership", "peer-updater")); 126 groupedThreads("onos/store/leadership", "peer-updater"));
129 clusterCommunicator.addSubscriber( 127 clusterCommunicator.addSubscriber(
130 LEADERSHIP_EVENT_MESSAGE_SUBJECT, 128 LEADERSHIP_EVENT_MESSAGE_SUBJECT,
131 - new InternalLeadershipEventListener(), 129 + SERIALIZER::decode,
130 + this::onLeadershipEvent,
132 messageHandlingExecutor); 131 messageHandlingExecutor);
133 132
134 deadLockDetectionExecutor.scheduleWithFixedDelay( 133 deadLockDetectionExecutor.scheduleWithFixedDelay(
...@@ -139,7 +138,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -139,7 +138,7 @@ public class DistributedLeadershipManager implements LeadershipService {
139 listenerRegistry = new AbstractListenerRegistry<>(); 138 listenerRegistry = new AbstractListenerRegistry<>();
140 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry); 139 eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
141 140
142 - log.info("Started."); 141 + log.info("Started");
143 } 142 }
144 143
145 @Deactivate 144 @Deactivate
...@@ -158,7 +157,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -158,7 +157,7 @@ public class DistributedLeadershipManager implements LeadershipService {
158 deadLockDetectionExecutor.shutdown(); 157 deadLockDetectionExecutor.shutdown();
159 leadershipStatusBroadcaster.shutdown(); 158 leadershipStatusBroadcaster.shutdown();
160 159
161 - log.info("Stopped."); 160 + log.info("Stopped");
162 } 161 }
163 162
164 @Override 163 @Override
...@@ -210,8 +209,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -210,8 +209,12 @@ public class DistributedLeadershipManager implements LeadershipService {
210 candidateList.add(localNodeId); 209 candidateList.add(localNodeId);
211 if (candidateMap.replace(path, candidates.version(), candidateList)) { 210 if (candidateMap.replace(path, candidates.version(), candidateList)) {
212 Versioned<List<NodeId>> newCandidates = candidateMap.get(path); 211 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
213 - notifyCandidateUpdated( 212 + publish(new LeadershipEvent(
214 - path, candidateList, newCandidates.version(), newCandidates.creationTime()); 213 + LeadershipEvent.Type.CANDIDATES_CHANGED,
214 + new Leadership(path,
215 + newCandidates.value(),
216 + newCandidates.version(),
217 + newCandidates.creationTime())));
215 } else { 218 } else {
216 rerunForLeadership(path); 219 rerunForLeadership(path);
217 return; 220 return;
...@@ -221,7 +224,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -221,7 +224,12 @@ public class DistributedLeadershipManager implements LeadershipService {
221 List<NodeId> candidateList = ImmutableList.of(localNodeId); 224 List<NodeId> candidateList = ImmutableList.of(localNodeId);
222 if ((candidateMap.putIfAbsent(path, candidateList) == null)) { 225 if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
223 Versioned<List<NodeId>> newCandidates = candidateMap.get(path); 226 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
224 - notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime()); 227 + publish(new LeadershipEvent(
228 + LeadershipEvent.Type.CANDIDATES_CHANGED,
229 + new Leadership(path,
230 + newCandidates.value(),
231 + newCandidates.version(),
232 + newCandidates.creationTime())));
225 } else { 233 } else {
226 rerunForLeadership(path); 234 rerunForLeadership(path);
227 return; 235 return;
...@@ -245,7 +253,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -245,7 +253,12 @@ public class DistributedLeadershipManager implements LeadershipService {
245 if (leader != null && Objects.equals(leader.value(), localNodeId)) { 253 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
246 if (leaderMap.remove(path, leader.version())) { 254 if (leaderMap.remove(path, leader.version())) {
247 log.info("Gave up leadership for {}", path); 255 log.info("Gave up leadership for {}", path);
248 - notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime()); 256 + publish(new LeadershipEvent(
257 + LeadershipEvent.Type.LEADER_BOOTED,
258 + new Leadership(path,
259 + localNodeId,
260 + leader.version(),
261 + leader.creationTime())));
249 } 262 }
250 } 263 }
251 // else we are not the current leader, can still be a candidate. 264 // else we are not the current leader, can still be a candidate.
...@@ -258,7 +271,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -258,7 +271,12 @@ public class DistributedLeadershipManager implements LeadershipService {
258 } 271 }
259 if (candidateMap.replace(path, candidates.version(), candidateList)) { 272 if (candidateMap.replace(path, candidates.version(), candidateList)) {
260 Versioned<List<NodeId>> newCandidates = candidateMap.get(path); 273 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
261 - notifyCandidateRemoved(path, candidates.version(), candidates.creationTime(), newCandidates); 274 + publish(new LeadershipEvent(
275 + LeadershipEvent.Type.CANDIDATES_CHANGED,
276 + new Leadership(path,
277 + newCandidates.value(),
278 + newCandidates.version(),
279 + newCandidates.creationTime())));
262 } else { 280 } else {
263 log.warn("Failed to withdraw from candidates list. Will retry"); 281 log.warn("Failed to withdraw from candidates list. Will retry");
264 retryWithdraw(path); 282 retryWithdraw(path);
...@@ -279,8 +297,14 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -279,8 +297,14 @@ public class DistributedLeadershipManager implements LeadershipService {
279 Versioned<NodeId> leader = leaderMap.get(path); 297 Versioned<NodeId> leader = leaderMap.get(path);
280 if (leader != null && Objects.equals(leader.value(), localNodeId)) { 298 if (leader != null && Objects.equals(leader.value(), localNodeId)) {
281 if (leaderMap.remove(path, leader.version())) { 299 if (leaderMap.remove(path, leader.version())) {
282 - log.info("Gave up leadership for {}", path); 300 + log.info("Stepped down from leadership for {}", path);
283 - notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime()); 301 + publish(new LeadershipEvent(
302 + LeadershipEvent.Type.LEADER_BOOTED,
303 + new Leadership(path,
304 + localNodeId,
305 + leader.version(),
306 + leader.creationTime())));
307 + retryLock(path);
284 return true; 308 return true;
285 } 309 }
286 } 310 }
...@@ -306,30 +330,35 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -306,30 +330,35 @@ public class DistributedLeadershipManager implements LeadershipService {
306 if (candidates == null || !candidates.value().contains(nodeId)) { 330 if (candidates == null || !candidates.value().contains(nodeId)) {
307 return false; 331 return false;
308 } 332 }
309 - if (nodeId.equals(candidates.value().get(0))) { 333 + List<NodeId> currentRoster = candidates.value();
334 + if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) {
310 return true; 335 return true;
311 } 336 }
312 - List<NodeId> currentRoster = candidates.value();
313 List<NodeId> newRoster = new ArrayList<>(currentRoster.size()); 337 List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
314 newRoster.add(nodeId); 338 newRoster.add(nodeId);
315 currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add); 339 currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
316 boolean updated = candidateMap.replace(path, candidates.version(), newRoster); 340 boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
317 if (updated) { 341 if (updated) {
318 Versioned<List<NodeId>> newCandidates = candidateMap.get(path); 342 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
319 - notifyCandidateUpdated( 343 + publish(new LeadershipEvent(
320 - path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime()); 344 + LeadershipEvent.Type.CANDIDATES_CHANGED,
345 + new Leadership(path,
346 + newCandidates.value(),
347 + newCandidates.version(),
348 + newCandidates.creationTime())));
321 } 349 }
322 return updated; 350 return updated;
323 } 351 }
324 352
325 private void tryLeaderLock(String path) { 353 private void tryLeaderLock(String path) {
326 - if (!activeTopics.contains(path)) { 354 + if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
327 return; 355 return;
328 } 356 }
329 try { 357 try {
330 Versioned<List<NodeId>> candidates = candidateMap.get(path); 358 Versioned<List<NodeId>> candidates = candidateMap.get(path);
331 if (candidates != null) { 359 if (candidates != null) {
332 - List<NodeId> activeNodes = candidates.value().stream() 360 + List<NodeId> activeNodes = candidates.value()
361 + .stream()
333 .filter(n -> clusterService.getState(n) == ACTIVE) 362 .filter(n -> clusterService.getState(n) == ACTIVE)
334 .collect(Collectors.toList()); 363 .collect(Collectors.toList());
335 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) { 364 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
...@@ -353,8 +382,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -353,8 +382,12 @@ public class DistributedLeadershipManager implements LeadershipService {
353 if (localNodeId.equals(currentLeader.value())) { 382 if (localNodeId.equals(currentLeader.value())) {
354 log.info("Already has leadership for {}", path); 383 log.info("Already has leadership for {}", path);
355 // FIXME: candidates can get out of sync. 384 // FIXME: candidates can get out of sync.
356 - notifyNewLeader( 385 + publish(new LeadershipEvent(
357 - path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime()); 386 + LeadershipEvent.Type.LEADER_ELECTED,
387 + new Leadership(path,
388 + localNodeId,
389 + currentLeader.version(),
390 + currentLeader.creationTime())));
358 } else { 391 } else {
359 // someone else has leadership. will retry after sometime. 392 // someone else has leadership. will retry after sometime.
360 retryLock(path); 393 retryLock(path);
...@@ -365,7 +398,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -365,7 +398,12 @@ public class DistributedLeadershipManager implements LeadershipService {
365 // do a get again to get the version (epoch) 398 // do a get again to get the version (epoch)
366 Versioned<NodeId> newLeader = leaderMap.get(path); 399 Versioned<NodeId> newLeader = leaderMap.get(path);
367 // FIXME: candidates can get out of sync 400 // FIXME: candidates can get out of sync
368 - notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime()); 401 + publish(new LeadershipEvent(
402 + LeadershipEvent.Type.LEADER_ELECTED,
403 + new Leadership(path,
404 + newLeader.value(),
405 + newLeader.version(),
406 + newLeader.creationTime())));
369 } else { 407 } else {
370 // someone beat us to it. 408 // someone beat us to it.
371 retryLock(path); 409 retryLock(path);
...@@ -377,100 +415,12 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -377,100 +415,12 @@ public class DistributedLeadershipManager implements LeadershipService {
377 } 415 }
378 } 416 }
379 417
380 - private void notifyCandidateUpdated( 418 + private void publish(LeadershipEvent event) {
381 - String path, List<NodeId> candidates, long epoch, long electedTime) { 419 + onLeadershipEvent(event);
382 - Leadership newInfo = new Leadership(path, candidates, epoch, electedTime); 420 + clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
383 - final MutableBoolean updated = new MutableBoolean(false);
384 - candidateBoard.compute(path, (k, current) -> {
385 - if (current == null || current.epoch() < newInfo.epoch()) {
386 - log.debug("updating candidateboard with {}", newInfo);
387 - updated.setTrue();
388 - return newInfo;
389 } 421 }
390 - return current;
391 - });
392 - // maybe rethink types of candidates events
393 - if (updated.booleanValue()) {
394 - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
395 - notifyPeers(event);
396 - }
397 - }
398 -
399 - private void notifyCandidateRemoved(
400 - String path, long oldEpoch, long oldTime, Versioned<List<NodeId>> candidates) {
401 - Leadership newInfo = (candidates == null)
402 - ? new Leadership(path, ImmutableList.of(), oldEpoch, oldTime)
403 - : new Leadership(path, candidates.value(), candidates.version(), candidates.creationTime());
404 - final MutableBoolean updated = new MutableBoolean(false);
405 -
406 - candidateBoard.compute(path, (k, current) -> {
407 - if (current != null && current.epoch() < newInfo.epoch()) {
408 - updated.setTrue();
409 - return newInfo;
410 - }
411 - return current;
412 - });
413 - // maybe rethink types of candidates events
414 - if (updated.booleanValue()) {
415 - log.debug("updated candidateboard with removal: {}", newInfo);
416 - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
417 - notifyPeers(event);
418 - }
419 - }
420 -
421 - private void notifyNewLeader(String path, NodeId leader,
422 - List<NodeId> candidates, long epoch, long electedTime) {
423 - Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
424 - final MutableBoolean updatedLeader = new MutableBoolean(false);
425 - log.debug("candidates for new Leadership {}", candidates);
426 - leaderBoard.compute(path, (k, currentLeader) -> {
427 - if (currentLeader == null || currentLeader.epoch() < epoch) {
428 - log.debug("updating leaderboard with new {}", newLeadership);
429 - updatedLeader.setTrue();
430 - return newLeadership;
431 - }
432 - return currentLeader;
433 - });
434 -
435 - if (updatedLeader.booleanValue()) {
436 - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
437 - notifyPeers(event);
438 - }
439 - }
440 -
441 - private void notifyPeers(LeadershipEvent event) {
442 - eventDispatcher.post(event);
443 - clusterCommunicator.broadcast(event,
444 - LEADERSHIP_EVENT_MESSAGE_SUBJECT,
445 - SERIALIZER::encode);
446 - }
447 -
448 - private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
449 - Versioned<List<NodeId>> candidates = candidateMap.get(path);
450 - Leadership oldLeadership = new Leadership(
451 - path, leader, candidates.value(), epoch, electedTime);
452 - final MutableBoolean updatedLeader = new MutableBoolean(false);
453 - leaderBoard.compute(path, (k, currentLeader) -> {
454 - if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
455 - updatedLeader.setTrue();
456 - return null;
457 - }
458 - return currentLeader;
459 - });
460 -
461 - if (updatedLeader.booleanValue()) {
462 - LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
463 - notifyPeers(event);
464 - }
465 - }
466 -
467 - private class InternalLeadershipEventListener implements ClusterMessageHandler {
468 -
469 - @Override
470 - public void handle(ClusterMessage message) {
471 - LeadershipEvent leadershipEvent =
472 - SERIALIZER.decode(message.payload());
473 422
423 + private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
474 log.trace("Leadership Event: time = {} type = {} event = {}", 424 log.trace("Leadership Event: time = {} type = {} event = {}",
475 leadershipEvent.time(), leadershipEvent.type(), 425 leadershipEvent.time(), leadershipEvent.type(),
476 leadershipEvent); 426 leadershipEvent);
...@@ -479,19 +429,19 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -479,19 +429,19 @@ public class DistributedLeadershipManager implements LeadershipService {
479 LeadershipEvent.Type eventType = leadershipEvent.type(); 429 LeadershipEvent.Type eventType = leadershipEvent.type();
480 String topic = leadershipUpdate.topic(); 430 String topic = leadershipUpdate.topic();
481 431
482 - MutableBoolean updateAccepted = new MutableBoolean(false); 432 + AtomicBoolean updateAccepted = new AtomicBoolean(false);
483 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) { 433 if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
484 leaderBoard.compute(topic, (k, currentLeadership) -> { 434 leaderBoard.compute(topic, (k, currentLeadership) -> {
485 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) { 435 if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
486 - updateAccepted.setTrue(); 436 + updateAccepted.set(true);
487 return leadershipUpdate; 437 return leadershipUpdate;
488 } 438 }
489 return currentLeadership; 439 return currentLeadership;
490 }); 440 });
491 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) { 441 } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
492 leaderBoard.compute(topic, (k, currentLeadership) -> { 442 leaderBoard.compute(topic, (k, currentLeadership) -> {
493 - if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) { 443 + if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
494 - updateAccepted.setTrue(); 444 + updateAccepted.set(true);
495 return null; 445 return null;
496 } 446 }
497 return currentLeadership; 447 return currentLeadership;
...@@ -499,7 +449,7 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -499,7 +449,7 @@ public class DistributedLeadershipManager implements LeadershipService {
499 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) { 449 } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
500 candidateBoard.compute(topic, (k, currentInfo) -> { 450 candidateBoard.compute(topic, (k, currentInfo) -> {
501 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) { 451 if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
502 - updateAccepted.setTrue(); 452 + updateAccepted.set(true);
503 return leadershipUpdate; 453 return leadershipUpdate;
504 } 454 }
505 return currentInfo; 455 return currentInfo;
...@@ -508,11 +458,10 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -508,11 +458,10 @@ public class DistributedLeadershipManager implements LeadershipService {
508 throw new IllegalStateException("Unknown event type."); 458 throw new IllegalStateException("Unknown event type.");
509 } 459 }
510 460
511 - if (updateAccepted.booleanValue()) { 461 + if (updateAccepted.get()) {
512 eventDispatcher.post(leadershipEvent); 462 eventDispatcher.post(leadershipEvent);
513 } 463 }
514 } 464 }
515 - }
516 465
517 private void rerunForLeadership(String path) { 466 private void rerunForLeadership(String path) {
518 retryLeaderLockExecutor.schedule( 467 retryLeaderLockExecutor.schedule(
...@@ -549,7 +498,9 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -549,7 +498,9 @@ public class DistributedLeadershipManager implements LeadershipService {
549 try { 498 try {
550 if (leaderMap.remove(path, epoch)) { 499 if (leaderMap.remove(path, epoch)) {
551 log.info("Purged stale lock held by {} for {}", nodeId, path); 500 log.info("Purged stale lock held by {} for {}", nodeId, path);
552 - notifyRemovedLeader(path, nodeId, epoch, creationTime); 501 + publish(new LeadershipEvent(
502 + LeadershipEvent.Type.LEADER_BOOTED,
503 + new Leadership(path, nodeId, epoch, creationTime)));
553 } 504 }
554 } catch (Exception e) { 505 } catch (Exception e) {
555 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e); 506 log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
......
...@@ -53,8 +53,6 @@ import org.onosproject.net.DeviceId; ...@@ -53,8 +53,6 @@ import org.onosproject.net.DeviceId;
53 import org.onosproject.net.MastershipRole; 53 import org.onosproject.net.MastershipRole;
54 import org.onosproject.store.AbstractStore; 54 import org.onosproject.store.AbstractStore;
55 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 55 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56 -import org.onosproject.store.cluster.messaging.ClusterMessage;
57 -import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
58 import org.onosproject.store.cluster.messaging.MessageSubject; 56 import org.onosproject.store.cluster.messaging.MessageSubject;
59 import org.onosproject.store.serializers.KryoNamespaces; 57 import org.onosproject.store.serializers.KryoNamespaces;
60 import org.onosproject.store.serializers.KryoSerializer; 58 import org.onosproject.store.serializers.KryoSerializer;
...@@ -122,11 +120,15 @@ public class ConsistentDeviceMastershipStore ...@@ -122,11 +120,15 @@ public class ConsistentDeviceMastershipStore
122 public void activate() { 120 public void activate() {
123 messageHandlingExecutor = 121 messageHandlingExecutor =
124 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler")); 122 Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
125 - clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT, 123 + clusterCommunicator.<DeviceId, MastershipRole>addSubscriber(ROLE_QUERY_SUBJECT,
126 - new RoleQueryHandler(), 124 + SERIALIZER::decode,
125 + deviceId -> getRole(localNodeId, deviceId),
126 + SERIALIZER::encode,
127 messageHandlingExecutor); 127 messageHandlingExecutor);
128 - clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT, 128 + clusterCommunicator.<DeviceId, MastershipEvent>addSubscriber(ROLE_RELINQUISH_SUBJECT,
129 - new RoleRelinquishHandler(), 129 + SERIALIZER::decode,
130 + deviceId -> relinquishRole(localNodeId, deviceId),
131 + SERIALIZER::encode,
130 messageHandlingExecutor); 132 messageHandlingExecutor);
131 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT, 133 clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
132 SERIALIZER::decode, 134 SERIALIZER::decode,
...@@ -211,8 +213,6 @@ public class ConsistentDeviceMastershipStore ...@@ -211,8 +213,6 @@ public class ConsistentDeviceMastershipStore
211 Map<NodeId, MastershipRole> roles = Maps.newHashMap(); 213 Map<NodeId, MastershipRole> roles = Maps.newHashMap();
212 clusterService 214 clusterService
213 .getNodes() 215 .getNodes()
214 - .stream()
215 - .parallel()
216 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId))); 216 .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
217 217
218 NodeId master = null; 218 NodeId master = null;
...@@ -282,9 +282,21 @@ public class ConsistentDeviceMastershipStore ...@@ -282,9 +282,21 @@ public class ConsistentDeviceMastershipStore
282 if (!nodeId.equals(currentMaster)) { 282 if (!nodeId.equals(currentMaster)) {
283 return null; 283 return null;
284 } 284 }
285 - // FIXME: This can become the master again unless it 285 +
286 - // is first demoted to the end of candidates list. 286 + String leadershipTopic = createDeviceMastershipTopic(deviceId);
287 - return transitionFromMasterToStandby(deviceId); 287 + List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
288 +
289 + NodeId newMaster = candidates.stream()
290 + .filter(candidate -> !Objects.equal(nodeId, candidate))
291 + .findFirst()
292 + .orElse(null);
293 + log.info("Transitioning to role {} for {}. Next master: {}",
294 + newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE, deviceId, newMaster);
295 +
296 + if (newMaster != null) {
297 + return setMaster(newMaster, deviceId);
298 + }
299 + return relinquishRole(nodeId, deviceId);
288 } 300 }
289 301
290 @Override 302 @Override
...@@ -344,28 +356,11 @@ public class ConsistentDeviceMastershipStore ...@@ -344,28 +356,11 @@ public class ConsistentDeviceMastershipStore
344 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null; 356 ? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
345 } 357 }
346 358
347 - private class RoleQueryHandler implements ClusterMessageHandler {
348 - @Override
349 - public void handle(ClusterMessage message) {
350 - DeviceId deviceId = SERIALIZER.decode(message.payload());
351 - message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
352 - }
353 - }
354 -
355 -
356 @Override 359 @Override
357 public void relinquishAllRole(NodeId nodeId) { 360 public void relinquishAllRole(NodeId nodeId) {
358 // Noop. LeadershipService already takes care of detecting and purging deadlocks. 361 // Noop. LeadershipService already takes care of detecting and purging deadlocks.
359 } 362 }
360 363
361 - private class RoleRelinquishHandler implements ClusterMessageHandler {
362 - @Override
363 - public void handle(ClusterMessage message) {
364 - DeviceId deviceId = SERIALIZER.decode(message.payload());
365 - message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
366 - }
367 - }
368 -
369 private class InternalDeviceMastershipEventListener implements LeadershipEventListener { 364 private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
370 @Override 365 @Override
371 public void event(LeadershipEvent event) { 366 public void event(LeadershipEvent event) {
......