Ayaka Koshibe
Committed by Yuta HIGUCHI

ConsistentDeviceMastership on top of LeadershipService, and leaders CLI command

modified to filter on topic. This does not support changing candidate ordering
(yet).

Refernce: ONOS-76

Change-Id: I028a6df0acbe3c4e4ff7c228f687f640e48e13be
......@@ -18,7 +18,9 @@ package org.onosproject.cli.net;
import java.util.Comparator;
import java.util.Map;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.apache.karaf.shell.commands.Option;
import org.onlab.util.Tools;
......@@ -40,6 +42,12 @@ public class LeaderCommand extends AbstractShellCommand {
private static final String FMT = "%-20s | %-15s | %-6s | %-10s |";
private static final String FMT_C = "%-20s | %-15s | %-19s |";
private boolean allTopics;
private Pattern pattern;
@Argument(index = 0, name = "topic", description = "A leadership topic. Can be a regex",
required = false, multiValued = false)
String topicPattern = null;
@Option(name = "-c", aliases = "--candidates",
description = "List candidate Nodes for each topic's leadership race",
......@@ -75,6 +83,7 @@ public class LeaderCommand extends AbstractShellCommand {
leaderBoard.values()
.stream()
.filter(l -> allTopics || pattern.matcher(l.topic()).matches())
.sorted(leadershipComparator)
.forEach(l -> print(FMT,
l.topic(),
......@@ -92,6 +101,7 @@ public class LeaderCommand extends AbstractShellCommand {
leaderBoard
.values()
.stream()
.filter(l -> allTopics || pattern.matcher(l.topic()).matches())
.sorted(leadershipComparator)
.forEach(l -> {
List<NodeId> list = candidates.get(l.topic()).candidates();
......@@ -135,6 +145,13 @@ public class LeaderCommand extends AbstractShellCommand {
LeadershipService leaderService = get(LeadershipService.class);
Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
if (topicPattern == null) {
allTopics = true;
} else {
allTopics = false;
pattern = Pattern.compile(topicPattern);
}
if (outputJson()) {
print("%s", json(leaderBoard));
} else {
......
......@@ -362,8 +362,8 @@ public class DistributedLeadershipManager implements LeadershipService {
Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
final MutableBoolean updated = new MutableBoolean(false);
candidateBoard.compute(path, (k, current) -> {
if (current != null && current.epoch() == newInfo.epoch()) {
log.info("updating candidateboard with {}", newInfo);
if (current != null && current.epoch() <= newInfo.epoch()) {
log.info("updating candidateboard with removal of {}", newInfo);
updated.setTrue();
if (candidates.isEmpty()) {
return null;
......@@ -452,7 +452,7 @@ public class DistributedLeadershipManager implements LeadershipService {
});
} else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
if (currentLeadership == null || currentLeadership.epoch() == leadershipUpdate.epoch()) {
updateAccepted.setTrue();
return null;
}
......@@ -462,6 +462,9 @@ public class DistributedLeadershipManager implements LeadershipService {
candidateBoard.compute(topic, (k, currentInfo) -> {
if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
updateAccepted.setTrue();
if (leadershipUpdate.candidates().isEmpty()) {
return null;
}
return leadershipUpdate;
}
return currentInfo;
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.futureGetOrElse;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -25,12 +26,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
......@@ -182,12 +179,12 @@ public class ConsistentDeviceMastershipStore
return MastershipRole.NONE;
}
}
MastershipRole role = complete(clusterCommunicator.sendAndReceive(
MastershipRole role = futureGetOrElse(clusterCommunicator.sendAndReceive(
deviceId,
ROLE_QUERY_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
nodeId));
nodeId), null);
return role == null ? MastershipRole.NONE : role;
}
......@@ -270,12 +267,12 @@ public class ConsistentDeviceMastershipStore
if (!nodeId.equals(localNodeId)) {
log.debug("Forwarding request to relinquish "
+ "role for device {} to {}", deviceId, nodeId);
return complete(clusterCommunicator.sendAndReceive(
return futureGetOrElse(clusterCommunicator.sendAndReceive(
deviceId,
ROLE_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
nodeId));
nodeId), null);
}
// Check if this node is can be managed by this node.
......@@ -374,16 +371,4 @@ public class ConsistentDeviceMastershipStore
return m.matches();
}
private <T> T complete(Future<byte[]> future) {
try {
return SERIALIZER.decode(future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for operation to complete.", e);
return null;
} catch (TimeoutException | ExecutionException e) {
log.error("Failed remote operation", e);
return null;
}
}
}
\ No newline at end of file
......