Ayaka Koshibe
Committed by Gerrit Code Review

DistributedLeadershipManager tracks topic election candidates in addition to

leaders. Includes update to leaders CLI command to list candidates.

part of: Device Mastership store on top of LeadershipService
Reference: ONOS-76

Conflicts:
	core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java

Change-Id: I587bb9e9ad16a9c8392969dde45001181053e5e6
......@@ -17,12 +17,15 @@ package org.onosproject.cli.net;
import java.util.Comparator;
import java.util.Map;
import java.util.List;
import org.apache.karaf.shell.commands.Command;
import org.apache.karaf.shell.commands.Option;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
......@@ -36,6 +39,12 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
public class LeaderCommand extends AbstractShellCommand {
private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
private static final String FMT_C = "%-20s | %-15s | %-19s |";
@Option(name = "-c", aliases = "--candidates",
description = "List candidate Nodes for each topic's leadership race",
required = false, multiValued = false)
private boolean showCandidates = false;
/**
* Compares leaders, sorting by toString() output.
......@@ -75,6 +84,28 @@ public class LeaderCommand extends AbstractShellCommand {
print("--------------------------------------------------------------");
}
private void displayCandidates(Map<String, Leadership> leaderBoard,
Map<String, List<NodeId>> candidates) {
print("--------------------------------------------------------------");
print(FMT_C, "Topic", "Leader", "Candidates");
print("--------------------------------------------------------------");
leaderBoard
.values()
.stream()
.sorted(leadershipComparator)
.forEach(l -> {
List<NodeId> list = candidates.get(l.topic());
print(FMT_C,
l.topic(),
l.leader(),
list.remove(0).toString());
// formatting hacks to get it into a table
list.forEach(n -> print(FMT_C, " ", " ", n));
print(FMT_C, " ", " ", " ");
});
print("--------------------------------------------------------------");
}
/**
* Returns JSON node representing the leaders.
*
......@@ -91,6 +122,7 @@ public class LeaderCommand extends AbstractShellCommand {
mapper.createObjectNode()
.put("topic", l.topic())
.put("leader", l.leader().toString())
.put("candidates", l.candidates().toString())
.put("epoch", l.epoch())
.put("electedTime", Tools.timeAgo(l.electedTime()))));
......@@ -106,7 +138,12 @@ public class LeaderCommand extends AbstractShellCommand {
if (outputJson()) {
print("%s", json(leaderBoard));
} else {
if (showCandidates) {
Map<String, List<NodeId>> candidates = leaderService.getCandidates();
displayCandidates(leaderBoard, candidates);
} else {
displayLeaders(leaderBoard);
}
}
}
}
......
......@@ -49,11 +49,10 @@ public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leaders
LEADER_BOOTED,
/**
* Signifies that the list of candidates for leadership for a resource
* has changed. If the change in the backups list is accompanied by a
* change in the leader, the event is subsumed by the leadership change.
* Signifies that the list of candidates for leadership for a topic has
* changed.
*/
LEADER_CANDIDATES_CHANGED
CANDIDATES_CHANGED
}
/**
......
......@@ -17,6 +17,7 @@ package org.onosproject.cluster;
import java.util.Map;
import java.util.Set;
import java.util.List;
/**
* Service for leader election.
......@@ -67,6 +68,19 @@ public interface LeadershipService {
Map<String, Leadership> getLeaderBoard();
/**
* Returns the candidates for all known topics.
* @return A map of topics to lists of NodeIds.
*/
Map<String, List<NodeId>> getCandidates();
/**
* Returns the candidates for a given topic.
* @param path topic
* @return A lists of NodeIds, which may be empty.
*/
List<NodeId> getCandidates(String path);
/**
* Registers a event listener to be notified of leadership events.
* @param listener listener that will asynchronously notified of leadership events.
*/
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.cluster;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -62,4 +63,14 @@ public class LeadershipServiceAdapter implements LeadershipService {
public void removeListener(LeadershipEventListener listener) {
}
@Override
public Map<String, List<NodeId>> getCandidates() {
return null;
}
@Override
public List<NodeId> getCandidates(String path) {
return null;
}
}
......
......@@ -46,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
......@@ -573,4 +574,14 @@ public class HazelcastLeadershipService implements LeadershipService {
eventDispatcher.post(leadershipEvent);
}
}
@Override
public Map<String, List<NodeId>> getCandidates() {
return null;
}
@Override
public List<NodeId> getCandidates(String path) {
return null;
}
}
......
package org.onosproject.store.consistent.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -12,6 +15,7 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.ControllerNode.State;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
......@@ -24,8 +28,8 @@ import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
......@@ -35,6 +39,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......@@ -77,7 +82,9 @@ public class DistributedLeadershipManager implements LeadershipService {
private ScheduledExecutorService deadLockDetectionExecutor;
private ScheduledExecutorService leadershipStatusBroadcaster;
private ConsistentMap<String, NodeId> lockMap;
private ConsistentMap<String, NodeId> leaderMap;
private ConsistentMap<String, List<NodeId>> candidateMap;
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
......@@ -85,25 +92,25 @@ public class DistributedLeadershipManager implements LeadershipService {
private Set<String> activeTopics = Sets.newConcurrentHashSet();
private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
}
};
private static final int LEADER_CANDIDATE_POS = 0;
private static final Serializer SERIALIZER = Serializer.using(
new KryoNamespace.Builder().register(KryoNamespaces.API).build());
@Activate
public void activate() {
lockMap = storageService.<String, NodeId>consistentMapBuilder()
.withName("onos-leader-locks")
.withSerializer(Serializer.using(new KryoNamespace.Builder().register(KryoNamespaces.API).build()))
leaderMap = storageService.<String, NodeId>consistentMapBuilder()
.withName("onos-topic-leaders")
.withSerializer(SERIALIZER)
.withPartitionsDisabled().build();
candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
.withName("onos-topic-candidates")
.withSerializer(SERIALIZER)
.withPartitionsDisabled().build();
localNodeId = clusterService.getLocalNode().id();
......@@ -157,6 +164,19 @@ public class DistributedLeadershipManager implements LeadershipService {
}
@Override
public Map<String, List<NodeId>> getCandidates() {
Map<String, List<NodeId>> candidates = Maps.newHashMap();
candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
return ImmutableMap.copyOf(candidates);
}
@Override
public List<NodeId> getCandidates(String path) {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
}
@Override
public NodeId getLeader(String path) {
Leadership leadership = leaderBoard.get(path);
return leadership != null ? leadership.leader() : null;
......@@ -181,24 +201,62 @@ public class DistributedLeadershipManager implements LeadershipService {
@Override
public void runForLeadership(String path) {
log.debug("Running for leadership for topic: {}", path);
try {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
List<NodeId> candidateList = Lists.newArrayList(candidates.value());
if (!candidateList.contains(localNodeId)) {
candidateList.add(localNodeId);
if (!candidateMap.replace(path, candidates.version(), candidateList)) {
rerunForLeadership(path);
return;
}
}
} else {
if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
rerunForLeadership(path);
return;
}
}
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
tryLeaderLock(path);
} catch (ConsistentMapException e) {
log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
rerunForLeadership(path);
}
}
@Override
public void withdraw(String path) {
activeTopics.remove(path);
try {
Versioned<NodeId> leader = lockMap.get(path);
if (Objects.equals(leader.value(), localNodeId)) {
if (lockMap.remove(path, leader.version())) {
Versioned<NodeId> leader = leaderMap.get(path);
if (leader != null && Objects.equals(leader.value(), localNodeId)) {
if (leaderMap.remove(path, leader.version())) {
log.info("Gave up leadership for {}", path);
notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
}
}
// else we are not the current owner.
// else we are not the current leader, can still be a candidate.
Versioned<List<NodeId>> candidates = candidateMap.get(path);
List<NodeId> candidateList = candidates != null
? Lists.newArrayList(candidates.value())
: Lists.newArrayList();
if (!candidateList.remove(localNodeId)) {
return;
}
boolean success = candidateList.isEmpty()
? candidateMap.remove(path, candidates.version())
: candidateMap.replace(path, candidates.version(), candidateList);
if (!success) {
log.warn("Failed to withdraw from candidates list. Will retry");
retryWithdraw(path);
}
} catch (Exception e) {
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
retryWithdraw(path);
}
}
......@@ -216,39 +274,62 @@ public class DistributedLeadershipManager implements LeadershipService {
if (!activeTopics.contains(path)) {
return;
}
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
List<NodeId> activeNodes = candidates.value().stream()
.filter(n -> clusterService.getState(n) == State.ACTIVE)
.collect(Collectors.toList());
if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
leaderLockAttempt(path, candidates.value());
} else {
retryLock(path);
}
} else {
throw new IllegalStateException("should not be here");
}
}
private void leaderLockAttempt(String path, List<NodeId> candidates) {
try {
Versioned<NodeId> currentLeader = lockMap.get(path);
Versioned<NodeId> currentLeader = leaderMap.get(path);
if (currentLeader != null) {
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
notifyNewLeader(path, localNodeId, currentLeader.version(), currentLeader.creationTime());
// FIXME: candidates can get out of sync.
notifyNewLeader(
path, localNodeId, candidates, currentLeader.version(), currentLeader.creationTime());
} else {
// someone else has leadership. will retry after sometime.
retry(path);
retryLock(path);
}
} else {
if (lockMap.putIfAbsent(path, localNodeId) == null) {
if (leaderMap.putIfAbsent(path, localNodeId) == null) {
log.info("Assumed leadership for {}", path);
// do a get again to get the version (epoch)
Versioned<NodeId> newLeader = lockMap.get(path);
notifyNewLeader(path, localNodeId, newLeader.version(), newLeader.creationTime());
Versioned<NodeId> newLeader = leaderMap.get(path);
// FIXME: candidates can get out of sync
notifyNewLeader(path, localNodeId, candidates, newLeader.version(), newLeader.creationTime());
} else {
// someone beat us to it.
retry(path);
retryLock(path);
}
}
} catch (Exception e) {
log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
retry(path);
retryLock(path);
}
}
private void notifyNewLeader(String path, NodeId leader, long epoch, long electedTime) {
Leadership newLeadership = new Leadership(path, leader, epoch, electedTime);
private void notifyNewLeader(String path, NodeId leader,
List<NodeId> candidates, long epoch, long electedTime) {
Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
boolean updatedLeader = false;
log.debug("candidates for new Leadership {}", candidates);
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
if (currentLeader == null || currentLeader.epoch() < epoch) {
log.debug("updating leaderboard with new {}", newLeadership);
leaderBoard.put(path, newLeadership);
updatedLeader = true;
}
......@@ -256,6 +337,11 @@ public class DistributedLeadershipManager implements LeadershipService {
if (updatedLeader) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
notifyPeers(event);
}
}
private void notifyPeers(LeadershipEvent event) {
eventDispatcher.post(event);
clusterCommunicator.broadcast(
new ClusterMessage(
......@@ -263,10 +349,11 @@ public class DistributedLeadershipManager implements LeadershipService {
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER.encode(event)));
}
}
private void notifyRemovedLeader(String path, NodeId leader, long epoch, long electedTime) {
Leadership oldLeadership = new Leadership(path, leader, epoch, electedTime);
Versioned<List<NodeId>> candidates = candidateMap.get(path);
Leadership oldLeadership = new Leadership(
path, leader, candidates.value(), epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
......@@ -316,6 +403,11 @@ public class DistributedLeadershipManager implements LeadershipService {
leaderBoard.remove(topic);
updateAccepted = true;
}
} else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
leaderBoard.replace(topic, leadershipUpdate);
updateAccepted = true;
}
} else {
throw new IllegalStateException("Unknown event type.");
}
......@@ -326,43 +418,46 @@ public class DistributedLeadershipManager implements LeadershipService {
}
}
private void retry(String path) {
private void rerunForLeadership(String path) {
retryLeaderLockExecutor.schedule(
() -> runForLeadership(path),
ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
TimeUnit.SECONDS);
}
private void retryLock(String path) {
retryLeaderLockExecutor.schedule(
() -> tryLeaderLock(path),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
private void retryWithdraw(String path) {
retryLeaderLockExecutor.schedule(
() -> withdraw(path),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
private void purgeStaleLocks() {
try {
Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
entries.forEach(entry -> {
leaderMap.entrySet()
.stream()
.filter(e -> clusterService.getState(e.getValue().value()) == ControllerNode.State.INACTIVE)
.filter(e -> localNodeId.equals(e.getValue().value()) && !activeTopics.contains(e.getKey()))
.forEach(entry -> {
String path = entry.getKey();
NodeId nodeId = entry.getValue().value();
long epoch = entry.getValue().version();
long creationTime = entry.getValue().creationTime();
if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
try {
if (lockMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
}
}
if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
log.debug("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
try {
if (lockMap.remove(path, epoch)) {
if (leaderMap.remove(path, epoch)) {
log.info("Purged stale lock held by {} for {}", nodeId, path);
notifyRemovedLeader(path, nodeId, epoch, creationTime);
}
} catch (Exception e) {
log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
}
}
});
} catch (Exception e) {
log.debug("Failed cleaning up stale locks", e);
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.trivial.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
......@@ -108,4 +109,14 @@ public class SimpleLeadershipManager implements LeadershipService {
public void removeListener(LeadershipEventListener listener) {
listeners.remove(listener);
}
@Override
public Map<String, List<NodeId>> getCandidates() {
return null;
}
@Override
public List<NodeId> getCandidates(String path) {
return null;
}
}
......