Madan Jampani
Committed by Gerrit Code Review

Fix balance-masters functionality in the new LeadershipService based device mastership store

Change-Id: I9f64d514cee7d5a5383fd4c2fa30a8616c97785c
......@@ -67,6 +67,32 @@ public interface LeadershipService {
void withdraw(String path);
/**
* If the local nodeId is the leader for specified topic, this method causes it to
* step down temporarily from leadership.
* <p>
* The node will continue to be in contention for leadership and can
* potentially become the leader again if and when it becomes the highest
* priority candidate
* <p>
* If the local nodeId is not the leader, this method will be a noop.
*
* @param path topic for which this controller node should give up leadership
* @return true if this node stepped down from leadership, false otherwise
*/
boolean stepdown(String path);
/**
* Moves the specified nodeId to the top of the candidates list for the topic.
* <p>
* If the node is not a candidate for this topic, this method will be a noop.
*
* @param path leadership topic
* @param nodeId nodeId to make the top candidate
* @return true if nodeId is now the top candidate, false otherwise
*/
boolean makeTopCandidate(String path, NodeId nodeId);
/**
* Returns the current leader board.
*
* @return mapping from topic to leadership info.
......
......@@ -73,4 +73,14 @@ public class LeadershipServiceAdapter implements LeadershipService {
public List<NodeId> getCandidates(String path) {
return null;
}
}
@Override
public boolean stepdown(String path) {
return false;
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
return false;
}
}
\ No newline at end of file
......
......@@ -24,8 +24,6 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.metrics.MetricsService;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
......@@ -52,8 +50,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import static org.onlab.metrics.MetricsUtil.startTimer;
......@@ -91,7 +87,6 @@ public class MastershipManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MetricsService metricsService;
private ClusterEventListener clusterListener = new InternalClusterEventListener();
private Timer requestRoleTimer;
@Activate
......@@ -99,7 +94,6 @@ public class MastershipManager
requestRoleTimer = createTimer("Mastership", "requestRole", "responseTime");
eventDispatcher.addSink(MastershipEvent.class, listenerRegistry);
clusterService.addListener(clusterListener);
store.setDelegate(delegate);
log.info("Started");
}
......@@ -107,7 +101,6 @@ public class MastershipManager
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(MastershipEvent.class);
clusterService.removeListener(clusterListener);
store.unsetDelegate(delegate);
log.info("Stopped");
}
......@@ -282,52 +275,6 @@ public class MastershipManager
}
}
//callback for reacting to cluster events
private class InternalClusterEventListener implements ClusterEventListener {
// A notion of a local maximum cluster size, used to tie-break.
// Think of a better way to do this.
private AtomicInteger clusterSize;
InternalClusterEventListener() {
clusterSize = new AtomicInteger(0);
}
@Override
public void event(ClusterEvent event) {
switch (event.type()) {
case INSTANCE_ADDED:
case INSTANCE_ACTIVATED:
clusterSize.incrementAndGet();
log.info("instance {} added/activated", event.subject());
break;
case INSTANCE_REMOVED:
case INSTANCE_DEACTIVATED:
ControllerNode node = event.subject();
log.info("instance {} removed/deactivated", node);
store.relinquishAllRole(node.id());
clusterSize.decrementAndGet();
break;
default:
log.warn("unknown cluster event {}", event);
}
}
// Can be removed if we go with naive split-brain handling: only majority
// assigns mastership
private boolean isInMajority() {
if (clusterService.getNodes().size() > (clusterSize.intValue() / 2)) {
return true;
}
//FIXME: break tie for equal-sized clusters,
return false;
}
}
public class InternalDelegate implements MastershipStoreDelegate {
@Override
......
......@@ -536,7 +536,7 @@ public class DeviceManager
if (myNextRole == NONE) {
mastershipService.requestRoleFor(did);
MastershipTerm term = termService.getMastershipTerm(did);
if (myNodeId.equals(term.master())) {
if (term != null && myNodeId.equals(term.master())) {
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
......
......@@ -584,4 +584,14 @@ public class HazelcastLeadershipService implements LeadershipService {
public List<NodeId> getCandidates(String path) {
return null;
}
@Override
public boolean stepdown(String path) {
throw new UnsupportedOperationException();
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
throw new UnsupportedOperationException();
}
}
......
......@@ -34,6 +34,7 @@ import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
......@@ -48,7 +49,6 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
......@@ -210,7 +210,7 @@ public class DistributedLeadershipManager implements LeadershipService {
candidateList.add(localNodeId);
if (candidateMap.replace(path, candidates.version(), candidateList)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
notifyCandidateAdded(
notifyCandidateUpdated(
path, candidateList, newCandidates.version(), newCandidates.creationTime());
} else {
rerunForLeadership(path);
......@@ -221,7 +221,7 @@ public class DistributedLeadershipManager implements LeadershipService {
List<NodeId> candidateList = ImmutableList.of(localNodeId);
if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
notifyCandidateUpdated(path, candidateList, newCandidates.version(), newCandidates.creationTime());
} else {
rerunForLeadership(path);
return;
......@@ -270,6 +270,27 @@ public class DistributedLeadershipManager implements LeadershipService {
}
@Override
public boolean stepdown(String path) {
if (!activeTopics.contains(path)) {
return false;
}
try {
Versioned<NodeId> leader = leaderMap.get(path);
if (leader != null && Objects.equals(leader.value(), localNodeId)) {
if (leaderMap.remove(path, leader.version())) {
log.info("Gave up leadership for {}", path);
notifyRemovedLeader(path, localNodeId, leader.version(), leader.creationTime());
return true;
}
}
} catch (Exception e) {
log.warn("Error executing stepdown for {}", path, e);
}
return false;
}
@Override
public void addListener(LeadershipEventListener listener) {
listenerRegistry.addListener(listener);
}
......@@ -279,6 +300,28 @@ public class DistributedLeadershipManager implements LeadershipService {
listenerRegistry.removeListener(listener);
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates == null || !candidates.value().contains(nodeId)) {
return false;
}
if (nodeId.equals(candidates.value().get(0))) {
return true;
}
List<NodeId> currentRoster = candidates.value();
List<NodeId> newRoster = new ArrayList<>(currentRoster.size());
newRoster.add(nodeId);
currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
if (updated) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
notifyCandidateUpdated(
path, newCandidates.value(), newCandidates.version(), newCandidates.creationTime());
}
return updated;
}
private void tryLeaderLock(String path) {
if (!activeTopics.contains(path)) {
return;
......@@ -334,7 +377,7 @@ public class DistributedLeadershipManager implements LeadershipService {
}
}
private void notifyCandidateAdded(
private void notifyCandidateUpdated(
String path, List<NodeId> candidates, long epoch, long electedTime) {
Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
final MutableBoolean updated = new MutableBoolean(false);
......
......@@ -93,6 +93,8 @@ public class ConsistentDeviceMastershipStore
new MessageSubject("mastership-store-device-role-query");
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
new MessageSubject("mastership-store-device-role-relinquish");
private static final MessageSubject MASTERSHIP_RELINQUISH_SUBJECT =
new MessageSubject("mastership-store-device-mastership-relinquish");
private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Pattern.compile("device:(.*)");
......@@ -111,6 +113,7 @@ public class ConsistentDeviceMastershipStore
.register(KryoNamespaces.API)
.register(MastershipRole.class)
.register(MastershipEvent.class)
.register(MastershipEvent.Type.class)
.build();
}
};
......@@ -125,6 +128,11 @@ public class ConsistentDeviceMastershipStore
clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
new RoleRelinquishHandler(),
messageHandlingExecutor);
clusterCommunicator.addSubscriber(MASTERSHIP_RELINQUISH_SUBJECT,
SERIALIZER::decode,
this::relinquishMastership,
SERIALIZER::encode,
messageHandlingExecutor);
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
......@@ -135,6 +143,7 @@ public class ConsistentDeviceMastershipStore
public void deactivate() {
clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
clusterCommunicator.removeSubscriber(MASTERSHIP_RELINQUISH_SUBJECT);
messageHandlingExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
......@@ -237,7 +246,22 @@ public class ConsistentDeviceMastershipStore
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
NodeId currentMaster = getMaster(deviceId);
if (nodeId.equals(currentMaster)) {
return null;
} else {
String leadershipTopic = createDeviceMastershipTopic(deviceId);
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
if (candidates.isEmpty()) {
return null;
}
if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
return relinquishMastership(deviceId);
} else {
log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
}
}
return null;
}
@Override
......@@ -254,7 +278,13 @@ public class ConsistentDeviceMastershipStore
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
NodeId currentMaster = getMaster(deviceId);
if (!nodeId.equals(currentMaster)) {
return null;
}
// FIXME: This can becomes the master again unless it
// is demoted to the end of candidates list.
return relinquishMastership(deviceId);
}
@Override
......@@ -294,6 +324,37 @@ public class ConsistentDeviceMastershipStore
return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
}
private MastershipEvent relinquishMastership(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (currentMaster == null) {
return null;
}
if (!currentMaster.equals(localNodeId)) {
log.info("Forwarding request to relinquish "
+ "mastership for device {} to {}", deviceId, currentMaster);
return futureGetOrElse(clusterCommunicator.sendAndReceive(
deviceId,
MASTERSHIP_RELINQUISH_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
currentMaster), null);
}
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
MastershipEvent.Type eventType = null;
if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
eventType = MastershipEvent.Type.MASTER_CHANGED;
}
return leadershipService.stepdown(leadershipTopic)
? new MastershipEvent(eventType, deviceId, getNodes(deviceId)) : null;
}
private class RoleQueryHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
......
......@@ -119,4 +119,14 @@ public class SimpleLeadershipManager implements LeadershipService {
public List<NodeId> getCandidates(String path) {
return null;
}
@Override
public boolean stepdown(String path) {
throw new UnsupportedOperationException();
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
throw new UnsupportedOperationException();
}
}
......