Ayaka Koshibe
Committed by Gerrit Code Review

Fixes/improvements:

 - Leaders command uses candidateBoard keys when using -c (list candidates)
   option.
 - Bug fix for lock retry

Change-Id: I42730a85b720fc5023b9b07bef153d975c95d4df
......@@ -98,18 +98,21 @@ public class LeaderCommand extends AbstractShellCommand {
print("--------------------------------------------------------------");
print(FMT_C, "Topic", "Leader", "Candidates");
print("--------------------------------------------------------------");
leaderBoard
.values()
candidates
.entrySet()
.stream()
.filter(l -> allTopics || pattern.matcher(l.topic()).matches())
.sorted(leadershipComparator)
.forEach(l -> {
List<NodeId> list = candidates.get(l.topic());
.filter(es -> allTopics || pattern.matcher(es.getKey()).matches())
.forEach(es -> {
List<NodeId> list = es.getValue();
if (list == null || list.isEmpty()) {
return;
}
Leadership l = leaderBoard.get(es.getKey());
print(FMT_C,
l.topic(),
l.leader(),
list.get(0).toString());
es.getKey(),
l == null ? "null" : l.leader(),
// formatting hacks to get it into a table
list.get(0).toString());
list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
print(FMT_C, " ", " ", " ");
});
......@@ -139,12 +142,32 @@ public class LeaderCommand extends AbstractShellCommand {
return result;
}
/**
* Returns JSON node representing the leaders.
*
* @param leaderBoard map of leaders
*/
private JsonNode json(Map<String, Leadership> leaderBoard,
Map<String, List<NodeId>> candidateBoard) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode result = mapper.createArrayNode();
candidateBoard.entrySet()
.stream()
.forEach(es -> {
Leadership l = leaderBoard.get(es.getKey());
result.add(
mapper.createObjectNode()
.put("topic", es.getKey())
.put("leader", l == null ? "none" : l.leader().toString())
.put("candidates", es.getValue().toString()));
});
return result;
}
@Override
protected void execute() {
LeadershipService leaderService = get(LeadershipService.class);
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
if (topicPattern == null) {
allTopics = true;
} else {
......@@ -152,12 +175,17 @@ public class LeaderCommand extends AbstractShellCommand {
pattern = Pattern.compile(topicPattern);
}
if (outputJson()) {
print("%s", json(leaderBoard));
} else {
if (showCandidates) {
Map<String, List<NodeId>> candidates = leaderService.getCandidates();
if (showCandidates) {
Map<String, List<NodeId>> candidates = leaderService
.getCandidates();
if (outputJson()) {
print("%s", json(leaderBoard, candidates));
} else {
displayCandidates(leaderBoard, candidates);
}
} else {
if (outputJson()) {
print("%s", json(leaderBoard));
} else {
displayLeaders(leaderBoard);
}
......
......@@ -283,19 +283,23 @@ public class DistributedLeadershipManager implements LeadershipService {
if (!activeTopics.contains(path)) {
return;
}
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
List<NodeId> activeNodes = candidates.value().stream()
.filter(n -> clusterService.getState(n) == ACTIVE)
.collect(Collectors.toList());
if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
leaderLockAttempt(path, candidates.value());
try {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
List<NodeId> activeNodes = candidates.value().stream()
.filter(n -> clusterService.getState(n) == ACTIVE)
.collect(Collectors.toList());
if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
leaderLockAttempt(path, candidates.value());
} else {
retryLock(path);
}
} else {
retryLock(path);
throw new IllegalStateException("should not be here");
}
} else {
throw new IllegalStateException("should not be here");
} catch (Exception e) {
log.debug("Failed to fetch candidate information for {}", path, e);
retryLock(path);
}
}
......@@ -336,7 +340,7 @@ public class DistributedLeadershipManager implements LeadershipService {
final MutableBoolean updated = new MutableBoolean(false);
candidateBoard.compute(path, (k, current) -> {
if (current == null || current.epoch() < newInfo.epoch()) {
log.info("updating candidateboard with {}", newInfo);
log.debug("updating candidateboard with {}", newInfo);
updated.setTrue();
return newInfo;
}
......