Madan Jampani

LeadershipStore updates:

 - Now tracking leader and candidates for a topic using a single map.
 - Using term numbers that are incremented by one every time a new leader is elected.
 - Introduced a separate LeadershipStore to conform to the  manager-store pattern

Change-Id: I1d03a6c5e8ff0e68ef0c1e3a6c2d425c4856e470
Showing 23 changed files with 897 additions and 1021 deletions
......@@ -16,6 +16,7 @@
package org.onosproject.cip;
import com.google.common.io.ByteStreams;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -93,7 +94,7 @@ public class ClusterIpManager {
cfgService.registerProperties(getClass());
localId = clusterService.getLocalNode().id();
processLeadershipChange(leadershipService.getLeader(CLUSTER_IP));
processLeaderChange(leadershipService.getLeader(CLUSTER_IP));
leadershipService.addListener(listener);
leadershipService.runForLeadership(CLUSTER_IP);
......@@ -137,10 +138,7 @@ public class ClusterIpManager {
}
}
private synchronized void processLeadershipChange(NodeId newLeader) {
if (newLeader == null) {
return;
}
private synchronized void processLeaderChange(NodeId newLeader) {
boolean isLeader = Objects.equals(newLeader, localId);
log.info("Processing leadership change; wasLeader={}, isLeader={}", wasLeader, isLeader);
if (!wasLeader && isLeader) {
......@@ -189,11 +187,15 @@ public class ClusterIpManager {
// Listens for leadership changes.
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
if (event.subject().topic().equals(CLUSTER_IP)) {
processLeadershipChange(event.subject().leader());
public boolean isRelevant(LeadershipEvent event) {
return CLUSTER_IP.equals(event.subject().topic());
}
@Override
public void event(LeadershipEvent event) {
processLeaderChange(event.subject().leaderNodeId());
}
}
......
......@@ -19,6 +19,7 @@ package org.onosproject.mlb;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -105,10 +106,7 @@ public class MastershipLoadBalancer {
log.info("Stopped");
}
private synchronized void processLeadershipChange(NodeId newLeader) {
if (newLeader == null) {
return;
}
private synchronized void processLeaderChange(NodeId newLeader) {
boolean currLeader = newLeader.equals(localId);
if (isLeader.getAndSet(currLeader) != currLeader) {
if (currLeader) {
......@@ -159,7 +157,7 @@ public class MastershipLoadBalancer {
@Override
public void event(LeadershipEvent event) {
processLeadershipChange(event.subject().leader());
processLeaderChange(event.subject().leaderNodeId());
}
}
}
......
......@@ -26,6 +26,7 @@ import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.intent.Intent;
......@@ -43,7 +44,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
......@@ -74,6 +74,7 @@ public class IntentSynchronizer implements IntentSynchronizationService,
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentService intentService;
private NodeId localNodeId;
private ApplicationId appId;
private final InternalLeadershipListener leadershipEventListener =
......@@ -89,7 +90,7 @@ public class IntentSynchronizer implements IntentSynchronizationService,
@Activate
public void activate() {
intentsSynchronizerExecutor = createExecutor();
this.localNodeId = clusterService.getLocalNode().id();
this.appId = coreService.registerApplication(APP_NAME);
leadershipService.addListener(leadershipEventListener);
......@@ -268,27 +269,22 @@ public class IntentSynchronizer implements IntentSynchronizationService,
private class InternalLeadershipListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
if (!event.subject().topic().equals(appId.name())) {
// Not our topic: ignore
return;
}
if (!Objects.equals(event.subject().leader(),
clusterService.getLocalNode().id())) {
// The event is not about this instance: ignore
return;
public boolean isRelevant(LeadershipEvent event) {
return event.subject().topic().equals(appId.name());
}
@Override
public void event(LeadershipEvent event) {
switch (event.type()) {
case LEADER_ELECTED:
case LEADER_CHANGED:
case LEADER_AND_CANDIDATES_CHANGED:
if (localNodeId.equals(event.subject().leaderNodeId())) {
log.info("IntentSynchronizer gained leadership");
leaderChanged(true);
break;
case LEADER_BOOTED:
log.info("IntentSynchronizer lost leadership");
} else {
log.info("IntentSynchronizer leader changed. New leader is {}", event.subject().leaderNodeId());
leaderChanged(false);
break;
case LEADER_REELECTED:
}
default:
break;
}
......
......@@ -17,6 +17,7 @@ package org.onosproject.routing.impl;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
......@@ -29,7 +30,11 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onosproject.TestApplicationId;
import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.incubator.net.intf.Interface;
......@@ -94,6 +99,9 @@ public class IntentSynchronizerTest extends AbstractIntentTest {
private static final ApplicationId APPID =
TestApplicationId.create("intent-sync-test");
private static final ControllerNode LOCAL_NODE =
new DefaultControllerNode(new NodeId("foo"), IpAddress.valueOf("127.0.0.1"));
@Before
public void setUp() throws Exception {
super.setUp();
......@@ -105,6 +113,7 @@ public class IntentSynchronizerTest extends AbstractIntentTest {
intentSynchronizer = new TestIntentSynchronizer();
intentSynchronizer.coreService = new TestCoreService();
intentSynchronizer.clusterService = new TestClusterService();
intentSynchronizer.leadershipService = new TestLeadershipService();
intentSynchronizer.intentService = intentService;
......@@ -441,6 +450,13 @@ public class IntentSynchronizerTest extends AbstractIntentTest {
}
}
private class TestClusterService extends ClusterServiceAdapter {
@Override
public ControllerNode getLocalNode() {
return LOCAL_NODE;
}
}
private class TestLeadershipService extends LeadershipServiceAdapter {
}
......
......@@ -24,12 +24,11 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.CoreService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.slf4j.Logger;
......@@ -56,7 +55,7 @@ public class ElectionTest {
private LeadershipEventListener leadershipEventListener =
new InnerLeadershipEventListener();
private ControllerNode localControllerNode;
private NodeId localNodeId;
@Activate
......@@ -65,7 +64,7 @@ public class ElectionTest {
appId = coreService.registerApplication(ELECTION_APP);
localControllerNode = clusterService.getLocalNode();
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
}
......@@ -100,20 +99,10 @@ public class ElectionTest {
log.debug("Leadership Event: time = {} type = {} event = {}",
event.time(), event.type(), event);
if (!event.subject().leader().equals(
localControllerNode.id())) {
return; // The event is not about this instance: ignore
}
switch (event.type()) {
case LEADER_ELECTED:
log.info("Election-test app leader elected");
break;
case LEADER_BOOTED:
log.info("Election-test app lost election");
break;
case LEADER_REELECTED:
log.debug("Election-test app was re-elected");
case LEADER_CHANGED:
case LEADER_AND_CANDIDATES_CHANGED:
log.info("Election-test app leader changed. New leadership: {}", event.subject());
break;
default:
break;
......
......@@ -40,7 +40,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
description = "Finds the leader for particular topic.")
public class LeaderCommand extends AbstractShellCommand {
private static final String FMT = "%-30s | %-15s | %-6s | %-10s |";
private static final String FMT = "%-30s | %-15s | %-5s | %-10s |";
private static final String FMT_C = "%-30s | %-15s | %-19s |";
private boolean allTopics;
private Pattern pattern;
......@@ -57,19 +57,8 @@ public class LeaderCommand extends AbstractShellCommand {
/**
* Compares leaders, sorting by toString() output.
*/
private Comparator<Leadership> leadershipComparator =
(e1, e2) -> {
if (e1.leader() == null && e2.leader() == null) {
return 0;
}
if (e1.leader() == null) {
return 1;
}
if (e2.leader() == null) {
return -1;
}
return e1.leader().toString().compareTo(e2.leader().toString());
};
private Comparator<Leadership> leadershipComparator = (l1, l2) ->
String.valueOf(l1.leaderNodeId()).compareTo(String.valueOf(l2.leaderNodeId()));
/**
* Displays text representing the leaders.
......@@ -78,18 +67,19 @@ public class LeaderCommand extends AbstractShellCommand {
*/
private void displayLeaders(Map<String, Leadership> leaderBoard) {
print("------------------------------------------------------------------------");
print(FMT, "Topic", "Leader", "Epoch", "Elected");
print(FMT, "Topic", "Leader", "Term", "Elected");
print("------------------------------------------------------------------------");
leaderBoard.values()
.stream()
.filter(l -> allTopics || pattern.matcher(l.topic()).matches())
.filter(l -> l.leader() != null)
.sorted(leadershipComparator)
.forEach(l -> print(FMT,
l.topic(),
l.leader(),
l.epoch(),
Tools.timeAgo(l.electedTime())));
l.leaderNodeId(),
l.leader().term(),
Tools.timeAgo(l.leader().termStartTime())));
print("------------------------------------------------------------------------");
}
......@@ -110,7 +100,7 @@ public class LeaderCommand extends AbstractShellCommand {
Leadership l = leaderBoard.get(es.getKey());
print(FMT_C,
es.getKey(),
l == null ? "null" : l.leader(),
String.valueOf(l.leaderNodeId()),
// formatting hacks to get it into a table
list.get(0).toString());
list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
......@@ -134,10 +124,10 @@ public class LeaderCommand extends AbstractShellCommand {
result.add(
mapper.createObjectNode()
.put("topic", l.topic())
.put("leader", l.leader().toString())
.put("leader", String.valueOf(l.leaderNodeId()))
.put("candidates", l.candidates().toString())
.put("epoch", l.epoch())
.put("electedTime", Tools.timeAgo(l.electedTime()))));
.put("epoch", l.leader().term())
.put("epochStartTime", Tools.timeAgo(l.leader().termStartTime()))));
return result;
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Topic leader.
* <p>
* Identified by the {@link NodeId node identifier} and a monotonically increasing term number.
* The term number is incremented by one every time a new node is elected as leader.
* Also available is the system clock time at the instant when this node was elected as leader.
* Keep in mind though that as with any system clock based time stamps this particular information
* susceptible to clock skew and should only be relied on for simple diagnostic purposes.
*/
public class Leader {
private final NodeId nodeId;
private final long term;
private final long termStartTime;
public Leader(NodeId nodeId, long term, long termStartTime) {
this.nodeId = checkNotNull(nodeId);
checkArgument(term >= 0, "term must be non-negative");
this.term = term;
checkArgument(termStartTime >= 0, "termStartTime must be non-negative");
this.termStartTime = termStartTime;
}
/**
* Returns the identifier for of leader.
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the leader's term.
* @return leader term
*/
public long term() {
return term;
}
/**
* Returns the system time when the current leadership term started.
* @return current leader term start time
*/
public long termStartTime() {
return termStartTime;
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other != null && other instanceof Leader) {
Leader that = (Leader) other;
return Objects.equal(this.nodeId, that.nodeId) &&
this.term == that.term &&
this.termStartTime == that.termStartTime;
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(nodeId, term, termStartTime);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("nodeId", nodeId)
.add("term", term)
.add("termStartTime", termStartTime)
.toString();
}
}
......@@ -17,63 +17,31 @@ package org.onosproject.cluster;
import java.util.Objects;
import java.util.List;
import java.util.Optional;
import org.joda.time.DateTime;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
* Abstract leadership concept. The information carried by this construct
* include the topic of contention, the {@link NodeId}s of Nodes that could
* become leader for the topic, the epoch when the term for a given leader
* began, and the system time when the term began. Note:
* <ul>
* <li>The list of NodeIds may include the current leader at index 0, and the
* rest in decreasing preference order.</li>
* <li>The epoch is the logical age of a Leadership construct, and should be
* used for comparing two Leaderships, but only of the same topic.</li>
* <li>The leader may be null if its accuracy can't be guaranteed. This applies
* to CANDIDATES_CHANGED events and candidate board contents.</li>
* </ul>
* State of leadership for topic.
* <p>
* Provided by this construct is the current {@link Leader leader} and the list of
* {@link NodeId nodeId}s currently registered as candidates for election for the topic.
* Keep in mind that only registered candidates can become leaders.
*/
public class Leadership {
private final String topic;
private final Optional<NodeId> leader;
private final Leader leader;
private final List<NodeId> candidates;
private final long epoch;
private final long electedTime;
public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
public Leadership(String topic, Leader leader, List<NodeId> candidates) {
this.topic = topic;
this.leader = Optional.of(leader);
this.candidates = ImmutableList.of(leader);
this.epoch = epoch;
this.electedTime = electedTime;
}
public Leadership(String topic, NodeId leader, List<NodeId> candidates,
long epoch, long electedTime) {
this.topic = topic;
this.leader = Optional.of(leader);
this.leader = leader;
this.candidates = ImmutableList.copyOf(candidates);
this.epoch = epoch;
this.electedTime = electedTime;
}
public Leadership(String topic, List<NodeId> candidates,
long epoch, long electedTime) {
this.topic = topic;
this.leader = Optional.empty();
this.candidates = ImmutableList.copyOf(candidates);
this.epoch = epoch;
this.electedTime = electedTime;
}
/**
* The topic for which this leadership applies.
* Returns the leadership topic.
*
* @return leadership topic.
*/
......@@ -82,57 +50,36 @@ public class Leadership {
}
/**
* The nodeId of leader for this topic.
*
* @return leader node.
*/
// This will return Optional<NodeId> in the future.
public NodeId leader() {
return leader.orElse(null);
}
/**
* Returns an preference-ordered list of nodes that are in the leadership
* race for this topic.
* Returns the {@link NodeId nodeId} of the leader.
*
* @return a list of NodeIds in priority-order, or an empty list.
* @return leader node identifier; will be null if there is no leader
*/
public List<NodeId> candidates() {
return candidates;
public NodeId leaderNodeId() {
return leader == null ? null : leader.nodeId();
}
/**
* The epoch when the leadership was assumed.
* <p>
* Comparing epochs is only appropriate for leadership events for the same
* topic. The system guarantees that for any given topic the epoch for a new
* term is higher (not necessarily by 1) than the epoch for any previous
* term.
* Returns the leader for this topic.
*
* @return leadership epoch
* @return leader; will be null if there is no leader for topic
*/
public long epoch() {
return epoch;
public Leader leader() {
return leader;
}
/**
* The system time when the term started.
* <p>
* The elected time is initially set on the node coordinating
* the leader election using its local system time. Due to possible
* clock skew, relying on this value for determining event ordering
* is discouraged. Epoch is more appropriate for determining
* event ordering.
* Returns an preference-ordered list of nodes that are in the leadership
* race for this topic.
*
* @return elected time.
* @return a list of NodeIds in priority-order, or an empty list.
*/
public long electedTime() {
return electedTime;
public List<NodeId> candidates() {
return candidates;
}
@Override
public int hashCode() {
return Objects.hash(topic, leader, candidates, epoch, electedTime);
return Objects.hash(topic, leader, candidates);
}
@Override
......@@ -144,9 +91,7 @@ public class Leadership {
final Leadership other = (Leadership) obj;
return Objects.equals(this.topic, other.topic) &&
Objects.equals(this.leader, other.leader) &&
Objects.equals(this.candidates, other.candidates) &&
Objects.equals(this.epoch, other.epoch) &&
Objects.equals(this.electedTime, other.electedTime);
Objects.equals(this.candidates, other.candidates);
}
return false;
}
......@@ -157,8 +102,6 @@ public class Leadership {
.add("topic", topic)
.add("leader", leader)
.add("candidates", candidates)
.add("epoch", epoch)
.add("electedTime", new DateTime(electedTime))
.toString();
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
/**
* Interface for administratively manipulating leadership assignments.
*/
public interface LeadershipAdminService {
/**
* Attempts to assign leadership for a topic to a specified node.
* @param topic leadership topic
* @param nodeId identifier of the node to be made leader
* @return true is the transfer was successfully executed. This method returns {@code false}
* if {@code nodeId} is not one of the candidates for for the topic.
*/
boolean transferLeadership(String topic, NodeId nodeId);
/**
* Make a node to be the next leader by promoting it to top of candidate list.
* @param topic leadership topic
* @param nodeId identifier of node to be next leader
* @return {@code true} if nodeId is now the top candidate. This method returns {@code false}
* if {@code nodeId} is not one of the candidates for for the topic.
*/
boolean promoteToTopOfCandidateList(String topic, NodeId nodeId);
/**
* Removes all active leadership registrations for a given node.
* <p>
* This method will also evict the node from leaderships that it currently owns.
* @param nodeId node identifier
*/
void unregister(NodeId nodeId);
}
......@@ -27,33 +27,25 @@ import com.google.common.base.MoreObjects;
public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leadership> {
/**
* Type of leadership-related events.
* Type of leadership events.
*/
public enum Type {
/**
* Signifies that the leader has been elected.
* The event subject is the new leader.
* This event does not guarantee accurate candidate information.
* Signifies a change in both the leader as well as change to the list of candidates. Keep in mind though that
* the first node entering the race will trigger this event as it will become a candidate and automatically get
* promoted to become leader.
*/
LEADER_ELECTED,
LEADER_AND_CANDIDATES_CHANGED,
/**
* Signifies that the leader has been re-elected.
* The event subject is the leader.
* This event does not guarantee accurate candidate information.
* Signifies that the leader for a topic has changed.
*/
LEADER_REELECTED,
// TODO: We may not need this. We currently do not support a way for a current leader to step down
// while still reamining a candidate
LEADER_CHANGED,
/**
* Signifies that the leader has been booted and lost leadership.
* The event subject is the former leader.
* This event does not guarantee accurate candidate information.
*/
LEADER_BOOTED,
/**
* Signifies that the list of candidates for leadership for a topic has
* changed. This event does not guarantee accurate leader information.
* Signifies a change in the list of candidates for a topic.
*/
CANDIDATES_CHANGED
}
......
......@@ -17,87 +17,73 @@ package org.onosproject.cluster;
import org.onosproject.event.ListenerService;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* Service for leader election.
* <p>
* Leadership contests are organized around topics. A instance can join the
* leadership race for a topic or withdraw from a race it has previously joined.
* <p>
* Listeners can be added to receive notifications asynchronously for various
* leadership contests.
* <p>
* When a node gets elected as a leader for a topic, all nodes receive notifications
* indicating a change in leadership.
*/
public interface LeadershipService
extends ListenerService<LeadershipEvent, LeadershipEventListener> {
/**
* Returns the current leader for the topic.
* Returns the {@link NodeId node identifier} that is the current leader for a topic.
*
* @param path topic
* @return nodeId of the leader, null if so such topic exists.
* @param topic leadership topic
* @return node identifier of the current leader; {@code null} if there is no leader for the topic
*/
NodeId getLeader(String path);
default NodeId getLeader(String topic) {
Leadership leadership = getLeadership(topic);
return leadership == null ? null : leadership.leaderNodeId();
}
/**
* Returns the current leadership info for the topic.
* Returns the current {@link Leadership leadership} for a topic.
*
* @param path topic
* @return leadership info or null if so such topic exists.
* @param topic leadership topic
* @return leadership or {@code null} if no such topic exists
*/
Leadership getLeadership(String path);
Leadership getLeadership(String topic);
/**
* Returns the set of topics owned by the specified node.
* Returns the set of topics owned by the specified {@link NodeId node}.
*
* @param nodeId node Id.
* @param nodeId node identifier.
* @return set of topics for which this node is the current leader.
*/
Set<String> ownedTopics(NodeId nodeId);
default Set<String> ownedTopics(NodeId nodeId) {
return Maps.filterValues(getLeaderBoard(), v -> Objects.equal(nodeId, v.leaderNodeId())).keySet();
}
/**
* Joins the leadership contest.
* Enters a leadership contest.
*
* @param path topic for which this controller node wishes to be a leader
* @param topic leadership topic
* @return {@code Leadership} future
*/
CompletableFuture<Leadership> runForLeadership(String path);
Leadership runForLeadership(String topic);
/**
* Withdraws from a leadership contest.
*
* @param path topic for which this controller node no longer wishes to be a leader
* @return future that is successfully completed when withdraw is done
*/
CompletableFuture<Void> withdraw(String path);
/**
* If the local nodeId is the leader for specified topic, this method causes it to
* step down temporarily from leadership.
* <p>
* The node will continue to be in contention for leadership and can
* potentially become the leader again if and when it becomes the highest
* priority candidate
* <p>
* If the local nodeId is not the leader, this method will make no changes and
* simply return false.
*
* @param path topic for which this controller node should give up leadership
* @return true if this node stepped down from leadership, false otherwise
*/
boolean stepdown(String path);
/**
* Moves the specified nodeId to the top of the candidates list for the topic.
* <p>
* If the node is not a candidate for this topic, this method will be a noop.
*
* @param path leadership topic
* @param nodeId nodeId to make the top candidate
* @return true if nodeId is now the top candidate, false otherwise
* @param topic leadership topic
*/
boolean makeTopCandidate(String path, NodeId nodeId);
void withdraw(String topic);
/**
* Returns the current leader board.
......@@ -107,18 +93,22 @@ public interface LeadershipService
Map<String, Leadership> getLeaderBoard();
/**
* Returns the candidates for all known topics.
* Returns the candidate nodes for each topic.
*
* @return A mapping from topics to corresponding list of candidates.
*/
Map<String, List<NodeId>> getCandidates();
default Map<String, List<NodeId>> getCandidates() {
return ImmutableMap.copyOf(Maps.transformValues(getLeaderBoard(), v -> ImmutableList.copyOf(v.candidates())));
}
/**
* Returns the candidates for a given topic.
* Returns the candidate nodes for a given topic.
*
* @param path topic
* @return A lists of NodeIds, which may be empty.
* @param topic leadership topic
* @return A lists of {@link NodeId nodeIds}, which may be empty.
*/
List<NodeId> getCandidates(String path);
default List<NodeId> getCandidates(String topic) {
Leadership leadership = getLeadership(topic);
return leadership == null ? ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
}
}
\ No newline at end of file
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import java.util.Map;
import org.onosproject.store.Store;
/**
* Store interface for managing {@link LeadershipService} state.
*/
public interface LeadershipStore extends Store<LeadershipEvent, LeadershipStoreDelegate> {
/**
* Adds registration for the local instance to be leader for topic.
*
* @param topic leadership topic
* @return Updated leadership after operation is completed
*/
Leadership addRegistration(String topic);
/**
* Unregisters the local instance from leadership contest for topic.
*
* @param topic leadership topic
*/
void removeRegistration(String topic);
/**
* Unregisters an instance from all leadership contests.
*
* @param nodeId node identifier
*/
void removeRegistration(NodeId nodeId);
/**
* Updates state so that given node is leader for a topic.
*
* @param topic leadership topic
* @param toNodeId identifier of the desired leader
* @return {@code true} if the transfer succeeded; {@code false} otherwise.
* This method can return {@code false} if the node is not registered for the topic
*/
boolean moveLeadership(String topic, NodeId toNodeId);
/**
* Attempts to make a node the top candidate.
*
* @param topic leadership topic
* @param nodeId node identifier
* @return {@code true} if the specified node is now the top candidate.
* This method will return {@code false} if the node is not registered for the topic
*/
boolean makeTopCandidate(String topic, NodeId nodeId);
/**
* Returns the current leadership for topic.
*
* @param topic leadership topic
* @return current leadership
*/
Leadership getLeadership(String topic);
/**
* Return current leadership for all topics.
*
* @return topic to leadership mapping
*/
Map<String, Leadership> getLeaderships();
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster;
import org.onosproject.store.StoreDelegate;
/**
* {@link LeadershipStore} delegate abstraction.
*/
public interface LeadershipStoreDelegate extends StoreDelegate<LeadershipEvent> {
}
......@@ -15,6 +15,8 @@
*/
package org.onosproject.cluster;
import java.util.Arrays;
import org.junit.Test;
import com.google.common.testing.EqualsTester;
......@@ -28,10 +30,11 @@ import static org.junit.Assert.assertThat;
public class LeadershipEventTest {
private final NodeId node1 = new NodeId("1");
private final NodeId node2 = new NodeId("2");
private final Leadership lead1 = new Leadership("topic1", node1, 1L, 2L);
private final Leadership lead2 = new Leadership("topic1", node2, 1L, 2L);
private final Leadership lead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
private final Leadership lead2 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1, node2));
private final Leadership lead3 = new Leadership("topic1", new Leader(node2, 1L, 2L), Arrays.asList(node2));
private final LeadershipEvent event1 =
new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, lead1);
new LeadershipEvent(LeadershipEvent.Type.LEADER_CHANGED, lead1);
private final long time = System.currentTimeMillis();
private final LeadershipEvent event2 =
new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
......@@ -40,11 +43,9 @@ public class LeadershipEventTest {
new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
lead2, time);
private final LeadershipEvent event3 =
new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, lead1);
new LeadershipEvent(LeadershipEvent.Type.LEADER_CHANGED, lead2);
private final LeadershipEvent event4 =
new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, lead1);
private final LeadershipEvent event5 =
new LeadershipEvent(LeadershipEvent.Type.LEADER_REELECTED, lead2);
new LeadershipEvent(LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED, lead3);
/**
* Tests for proper operation of equals(), hashCode() and toString() methods.
......@@ -56,7 +57,6 @@ public class LeadershipEventTest {
.addEqualityGroup(event2, sameAsEvent2)
.addEqualityGroup(event3)
.addEqualityGroup(event4)
.addEqualityGroup(event5)
.testEquals();
}
......@@ -65,7 +65,7 @@ public class LeadershipEventTest {
*/
@Test
public void checkConstruction() {
assertThat(event1.type(), is(LeadershipEvent.Type.LEADER_ELECTED));
assertThat(event1.type(), is(LeadershipEvent.Type.LEADER_CHANGED));
assertThat(event1.subject(), is(lead1));
assertThat(event2.time(), is(time));
......
......@@ -18,7 +18,6 @@ package org.onosproject.cluster;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* Test adapter for leadership service.
......@@ -41,13 +40,12 @@ public class LeadershipServiceAdapter implements LeadershipService {
}
@Override
public CompletableFuture<Leadership> runForLeadership(String path) {
public Leadership runForLeadership(String path) {
return null;
}
@Override
public CompletableFuture<Void> withdraw(String path) {
return null;
public void withdraw(String path) {
}
@Override
......@@ -74,14 +72,4 @@ public class LeadershipServiceAdapter implements LeadershipService {
public List<NodeId> getCandidates(String path) {
return null;
}
@Override
public boolean stepdown(String path) {
return false;
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
return false;
}
}
\ No newline at end of file
......
......@@ -15,12 +15,13 @@
*/
package org.onosproject.cluster;
import java.util.Arrays;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
import com.google.common.testing.EqualsTester;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
......@@ -31,16 +32,14 @@ import static org.junit.Assert.assertThat;
public class LeadershipTest {
private final NodeId node1 = new NodeId("1");
private final NodeId node2 = new NodeId("2");
private final Leadership lead1 = new Leadership("topic1", node1, 1L, 2L);
private final Leadership sameAsLead1 = new Leadership("topic1", node1, 1L, 2L);
private final Leadership lead2 = new Leadership("topic2", node1, 1L, 2L);
private final Leadership lead3 = new Leadership("topic1", node1, 2L, 2L);
private final Leadership lead4 = new Leadership("topic1", node1, 3L, 2L);
private final Leadership lead5 = new Leadership("topic1", node1, 3L, 3L);
private final Leadership lead6 = new Leadership("topic1", node1,
ImmutableList.of(node2), 1L, 2L);
private final Leadership lead7 = new Leadership("topic1",
ImmutableList.of(node2), 1L, 2L);
private final Leadership lead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
private final Leadership sameAsLead1 = new Leadership("topic1", new Leader(node1, 1L, 2L), Arrays.asList(node1));
private final Leadership lead2 = new Leadership("topic2", new Leader(node1, 1L, 2L), Arrays.asList(node1));
private final Leadership lead3 = new Leadership("topic1", new Leader(node1, 2L, 2L), Arrays.asList(node1));
private final Leadership lead4 = new Leadership("topic1", new Leader(node1, 3L, 2L), Arrays.asList(node1));
private final Leadership lead5 = new Leadership("topic1", new Leader(node1, 3L, 3L), Arrays.asList(node1));
private final Leadership lead6 = new Leadership("topic1", new Leader(node2, 1L, 2L), Arrays.asList(node2, node1));
private final Leadership lead7 = new Leadership("topic1", null, ImmutableList.of());
/**
* Tests for proper operation of equals(), hashCode() and toString() methods.
......@@ -64,12 +63,10 @@ public class LeadershipTest {
*/
@Test
public void checkConstruction() {
assertThat(lead6.electedTime(), is(2L));
assertThat(lead6.epoch(), is(1L));
assertThat(lead6.leader(), is(node1));
assertThat(lead6.leader(), is(new Leader(node2, 1L, 2L)));
assertThat(lead6.topic(), is("topic1"));
assertThat(lead6.candidates(), hasSize(1));
assertThat(lead6.candidates(), contains(node2));
assertThat(lead6.candidates(), hasSize(2));
assertThat(lead6.candidates().get(1), is(node1));
assertThat(lead6.candidates().get(0), is(node2));
}
}
......
......@@ -17,20 +17,22 @@ package org.onosproject.store.trivial;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEvent.Type;
......@@ -53,8 +55,15 @@ public class SimpleLeadershipManager implements LeadershipService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
private NodeId localNodeId;
private Map<String, Boolean> elections = new ConcurrentHashMap<>();
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
}
@Override
public NodeId getLeader(String path) {
return elections.get(path) ? clusterService.getLocalNode().id() : null;
......@@ -63,7 +72,8 @@ public class SimpleLeadershipManager implements LeadershipService {
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0, 0) : null;
return elections.get(path) ?
new Leadership(path, new Leader(localNodeId, 0, 0), Arrays.asList(localNodeId)) : null;
}
@Override
......@@ -77,23 +87,22 @@ public class SimpleLeadershipManager implements LeadershipService {
}
@Override
public CompletableFuture<Leadership> runForLeadership(String path) {
public Leadership runForLeadership(String path) {
elections.put(path, true);
Leadership leadership = new Leadership(path, new Leader(localNodeId, 0, 0), Arrays.asList(localNodeId));
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
listener.event(new LeadershipEvent(Type.LEADER_AND_CANDIDATES_CHANGED, leadership));
}
return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0));
return leadership;
}
@Override
public CompletableFuture<Void> withdraw(String path) {
public void withdraw(String path) {
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
listener.event(new LeadershipEvent(Type.LEADER_AND_CANDIDATES_CHANGED,
new Leadership(path, null, Arrays.asList())));
}
return CompletableFuture.completedFuture(null);
}
@Override
......@@ -122,14 +131,4 @@ public class SimpleLeadershipManager implements LeadershipService {
public List<NodeId> getCandidates(String path) {
return null;
}
@Override
public boolean stepdown(String path) {
throw new UnsupportedOperationException();
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
throw new UnsupportedOperationException();
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cluster.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipAdminService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
/**
* Implementation of {@link LeadershipService} and {@link LeadershipAdminService}.
*/
@Component(immediate = true)
@Service
public class LeadershipManager
extends AbstractListenerManager<LeadershipEvent, LeadershipEventListener>
implements LeadershipService, LeadershipAdminService {
private final Logger log = getLogger(getClass());
private LeadershipStoreDelegate delegate = this::post;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipStore store;
private NodeId localNodeId;
private final ScheduledExecutorService deadlockDetector =
Executors.newSingleThreadScheduledExecutor(Tools.groupedThreads("onos/leadership", ""));
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
store.setDelegate(delegate);
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
deadlockDetector.scheduleWithFixedDelay(() -> clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.filter(id -> clusterService.getState(id) != ControllerNode.State.ACTIVE)
.forEach(this::unregister), 0, 2, TimeUnit.SECONDS);
log.info("Started");
}
@Deactivate
public void deactivate() {
deadlockDetector.shutdown();
Maps.filterValues(store.getLeaderships(), v -> v.candidates().contains(localNodeId))
.keySet()
.forEach(this::withdraw);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(LeadershipEvent.class);
log.info("Stopped");
}
@Override
public Leadership getLeadership(String topic) {
return store.getLeadership(topic);
}
@Override
public Leadership runForLeadership(String topic) {
return store.addRegistration(topic);
}
@Override
public void withdraw(String topic) {
store.removeRegistration(topic);
}
@Override
public Map<String, Leadership> getLeaderBoard() {
return store.getLeaderships();
}
@Override
public boolean transferLeadership(String topic, NodeId to) {
return store.moveLeadership(topic, to);
}
@Override
public void unregister(NodeId nodeId) {
store.removeRegistration(nodeId);
}
@Override
public boolean promoteToTopOfCandidateList(String topic, NodeId nodeId) {
return store.makeTopCandidate(topic, nodeId);
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipStore;
import org.onosproject.cluster.LeadershipStoreDelegate;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Implementation of {@code LeadershipStore} backed by {@link ConsistentMap}.
*/
@Service
@Component(immediate = true, enabled = true)
public class DistributedLeadershipStore
extends AbstractStore<LeadershipEvent, LeadershipStoreDelegate>
implements LeadershipStore {
private static final Logger log = getLogger(DistributedLeadershipStore.class);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected NodeId localNodeId;
protected ConsistentMap<String, InternalLeadership> leadershipMap;
private final MapEventListener<String, InternalLeadership> leadershipChangeListener =
event -> {
Leadership oldValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.oldValue()));
Leadership newValue = InternalLeadership.toLeadership(Versioned.valueOrNull(event.newValue()));
boolean leaderChanged =
!Objects.equal(oldValue == null ? null : oldValue.leader(), newValue.leader());
boolean candidatesChanged =
!Sets.symmetricDifference(Sets.newHashSet(oldValue == null ?
ImmutableSet.<NodeId>of() : oldValue.candidates()),
Sets.newHashSet(newValue.candidates())).isEmpty();
LeadershipEvent.Type eventType = null;
if (leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_AND_CANDIDATES_CHANGED;
}
if (leaderChanged && !candidatesChanged) {
eventType = LeadershipEvent.Type.LEADER_CHANGED;
}
if (!leaderChanged && candidatesChanged) {
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
notifyDelegate(new LeadershipEvent(eventType, newValue));
};
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
leadershipMap = storageService.<String, InternalLeadership>consistentMapBuilder()
.withName("onos-leadership")
.withPartitionsDisabled()
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API, InternalLeadership.class))
.build();
leadershipMap.addListener(leadershipChangeListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leadershipMap.removeListener(leadershipChangeListener);
log.info("Stopped");
}
@Override
public Leadership addRegistration(String topic) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v == null || !v.candidates().contains(localNodeId),
(k, v) -> {
if (v == null || v.candidates().isEmpty()) {
return new InternalLeadership(topic,
localNodeId,
v == null ? 1 : v.term() + 1,
System.currentTimeMillis(),
ImmutableList.of(localNodeId));
}
List<NodeId> newCandidates = new ArrayList<>(v.candidates());
newCandidates.add(localNodeId);
return new InternalLeadership(topic, v.leader(), v.term(), v.termStartTime(), newCandidates);
});
return InternalLeadership.toLeadership(Versioned.valueOrNull(internalLeadership));
}
@Override
public void removeRegistration(String topic) {
removeRegistration(topic, localNodeId);
}
private void removeRegistration(String topic, NodeId nodeId) {
leadershipMap.computeIf(topic,
v -> v != null && v.candidates().contains(nodeId),
(k, v) -> {
List<NodeId> newCandidates = v.candidates()
.stream()
.filter(id -> !nodeId.equals(id))
.collect(Collectors.toList());
NodeId newLeader = nodeId.equals(v.leader()) ?
newCandidates.size() > 0 ? newCandidates.get(0) : null : v.leader();
long newTerm = newLeader == null || Objects.equal(newLeader, v.leader()) ?
v.term() : v.term() + 1;
long newTermStartTime = newLeader == null || Objects.equal(newLeader, v.leader()) ?
v.termStartTime() : System.currentTimeMillis();
return new InternalLeadership(topic, newLeader, newTerm, newTermStartTime, newCandidates);
});
}
@Override
public void removeRegistration(NodeId nodeId) {
leadershipMap.entrySet()
.stream()
.filter(e -> e.getValue().value().candidates().contains(nodeId))
.map(e -> e.getKey())
.forEach(topic -> this.removeRegistration(topic, nodeId));
}
@Override
public boolean moveLeadership(String topic, NodeId toNodeId) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v != null &&
v.candidates().contains(toNodeId) &&
!Objects.equal(v.leader(), toNodeId),
(k, v) -> {
List<NodeId> newCandidates = new ArrayList<>();
newCandidates.add(toNodeId);
newCandidates.addAll(v.candidates()
.stream()
.filter(id -> !toNodeId.equals(id))
.collect(Collectors.toList()));
return new InternalLeadership(topic,
toNodeId,
v.term() + 1,
System.currentTimeMillis(),
newCandidates);
});
return Objects.equal(toNodeId, Versioned.valueOrNull(internalLeadership).leader());
}
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
Versioned<InternalLeadership> internalLeadership = leadershipMap.computeIf(topic,
v -> v != null &&
v.candidates().contains(nodeId) &&
!v.candidates().get(0).equals(nodeId),
(k, v) -> {
List<NodeId> newCandidates = new ArrayList<>();
newCandidates.add(nodeId);
newCandidates.addAll(v.candidates()
.stream()
.filter(id -> !nodeId.equals(id))
.collect(Collectors.toList()));
return new InternalLeadership(topic,
v.leader(),
v.term(),
System.currentTimeMillis(),
newCandidates);
});
return internalLeadership != null && nodeId.equals(internalLeadership.value().candidates().get(0));
}
@Override
public Leadership getLeadership(String topic) {
return InternalLeadership.toLeadership(Versioned.valueOrNull(leadershipMap.get(topic)));
}
@Override
public Map<String, Leadership> getLeaderships() {
Map<String, Leadership> leaderships = Maps.newHashMap();
leadershipMap.entrySet().forEach(e -> {
leaderships.put(e.getKey(), e.getValue().value().asLeadership());
});
return ImmutableMap.copyOf(leaderships);
}
private static class InternalLeadership {
private final String topic;
private final NodeId leader;
private final long term;
private final long termStartTime;
private final List<NodeId> candidates;
public InternalLeadership(String topic,
NodeId leader,
long term,
long termStartTime,
List<NodeId> candidates) {
this.topic = topic;
this.leader = leader;
this.term = term;
this.termStartTime = termStartTime;
this.candidates = ImmutableList.copyOf(candidates);
}
public NodeId leader() {
return this.leader;
}
public long term() {
return term;
}
public long termStartTime() {
return termStartTime;
}
public List<NodeId> candidates() {
return candidates;
}
public Leadership asLeadership() {
return new Leadership(topic, leader == null ?
null : new Leader(leader, term, termStartTime), candidates);
}
public static Leadership toLeadership(InternalLeadership internalLeadership) {
return internalLeadership == null ? null : internalLeadership.asLeadership();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("leader", leader)
.add("term", term)
.add("termStartTime", termStartTime)
.add("candidates", candidates)
.toString();
}
}
}
......@@ -21,8 +21,6 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.Leadership;
......@@ -76,7 +74,6 @@ public class IntentPartitionManager implements IntentPartitionService {
private ListenerRegistry<IntentPartitionEvent, IntentPartitionEventListener> listenerRegistry;
private LeadershipEventListener leaderListener = new InternalLeadershipListener();
private ClusterEventListener clusterListener = new InternalClusterEventListener();
private ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);
......@@ -84,7 +81,6 @@ public class IntentPartitionManager implements IntentPartitionService {
@Activate
public void activate() {
leadershipService.addListener(leaderListener);
clusterService.addListener(clusterListener);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(IntentPartitionEvent.class, listenerRegistry);
......@@ -103,7 +99,6 @@ public class IntentPartitionManager implements IntentPartitionService {
eventDispatcher.removeSink(IntentPartitionEvent.class);
leadershipService.removeListener(leaderListener);
clusterService.removeListener(clusterListener);
}
/**
......@@ -180,7 +175,7 @@ public class IntentPartitionManager implements IntentPartitionService {
List<Leadership> myPartitions = leadershipService.getLeaderBoard().values()
.stream()
.filter(l -> clusterService.getLocalNode().id().equals(l.leader()))
.filter(l -> clusterService.getLocalNode().id().equals(l.leaderNodeId()))
.filter(l -> l.topic().startsWith(ELECTION_PREFIX))
.collect(Collectors.toList());
......@@ -220,24 +215,16 @@ public class IntentPartitionManager implements IntentPartitionService {
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
if (Objects.equals(leadership.leader(), clusterService.getLocalNode().id()) &&
if (Objects.equals(leadership.leaderNodeId(), clusterService.getLocalNode().id()) &&
leadership.topic().startsWith(ELECTION_PREFIX)) {
// See if we need to let some partitions go
scheduleRebalance(0);
eventDispatcher.post(new IntentPartitionEvent(IntentPartitionEvent.Type.LEADER_CHANGED,
leadership.topic()));
}
}
}
private final class InternalClusterEventListener implements
ClusterEventListener {
@Override
public void event(ClusterEvent event) {
if (event.type() == LeadershipEvent.Type.CANDIDATES_CHANGED) {
scheduleRebalance(0);
}
}
}
}
......
......@@ -16,7 +16,6 @@
package org.onosproject.store.mastership.impl;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.futureGetOrElse;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -43,6 +42,7 @@ import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipAdminService;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
......@@ -63,9 +63,9 @@ import org.onosproject.store.serializers.StoreSerializer;
import org.slf4j.Logger;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Implementation of the MastershipStore on top of Leadership Service.
......@@ -82,18 +82,18 @@ public class ConsistentDeviceMastershipStore
protected LeadershipService leadershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LeadershipAdminService leadershipAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
private NodeId localNodeId;
private final Set<DeviceId> connectedDevices = Sets.newHashSet();
private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
new MessageSubject("mastership-store-device-role-relinquish");
private static final MessageSubject TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT =
new MessageSubject("mastership-store-device-mastership-relinquish");
private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
Pattern.compile("device:(.*)");
......@@ -132,11 +132,6 @@ public class ConsistentDeviceMastershipStore
this::relinquishLocalRole,
SERIALIZER::encode,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
SERIALIZER::decode,
this::transitionFromMasterToStandby,
SERIALIZER::encode,
messageHandlingExecutor);
localNodeId = clusterService.getLocalNode().id();
leadershipService.addListener(leadershipEventListener);
......@@ -146,7 +141,6 @@ public class ConsistentDeviceMastershipStore
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
clusterCommunicator.removeSubscriber(TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT);
messageHandlingExecutor.shutdown();
transferExecutor.shutdown();
leadershipService.removeListener(leadershipEventListener);
......@@ -159,12 +153,9 @@ public class ConsistentDeviceMastershipStore
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
connectedDevices.add(deviceId);
return leadershipService.runForLeadership(leadershipTopic)
.thenApply(leadership -> {
return Objects.equal(localNodeId, leadership.leader())
? MastershipRole.MASTER : MastershipRole.STANDBY;
});
Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
return CompletableFuture.completedFuture(localNodeId.equals(leadership.leaderNodeId())
? MastershipRole.MASTER : MastershipRole.STANDBY);
}
@Override
......@@ -173,20 +164,19 @@ public class ConsistentDeviceMastershipStore
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
NodeId leader = leadershipService.getLeader(leadershipTopic);
if (Objects.equal(nodeId, leader)) {
return MastershipRole.MASTER;
}
return leadershipService.getCandidates(leadershipTopic).contains(nodeId) ?
MastershipRole.STANDBY : MastershipRole.NONE;
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
NodeId leader = leadership == null ? null : leadership.leaderNodeId();
List<NodeId> candidates = leadership == null ?
ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
return Objects.equal(nodeId, leader) ?
MastershipRole.MASTER : candidates.contains(nodeId) ? MastershipRole.STANDBY : MastershipRole.NONE;
}
@Override
public NodeId getMaster(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
return leadershipService.getLeader(leadershipTopic);
return leadershipService.getLeader(createDeviceMastershipTopic(deviceId));
}
@Override
......@@ -194,8 +184,7 @@ public class ConsistentDeviceMastershipStore
checkArgument(deviceId != null, DEVICE_ID_NULL);
Map<NodeId, MastershipRole> roles = Maps.newHashMap();
clusterService
.getNodes()
clusterService.getNodes()
.forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
NodeId master = null;
......@@ -233,30 +222,10 @@ public class ConsistentDeviceMastershipStore
checkArgument(nodeId != null, NODE_ID_NULL);
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (nodeId.equals(currentMaster)) {
return CompletableFuture.completedFuture(null);
} else {
String leadershipTopic = createDeviceMastershipTopic(deviceId);
List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
if (candidates.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
if (leadershipService.makeTopCandidate(leadershipTopic, nodeId)) {
CompletableFuture<MastershipEvent> result = new CompletableFuture<>();
// There is brief wait before we step down from mastership.
// This is to ensure any work that happens when standby preference
// order changes can complete. For example: flow entries need to be backed
// to the new top standby (ONOS-1883)
// FIXME: This potentially introduces a race-condition.
// Right now role changes are only forced via CLI.
transferExecutor.schedule(() -> {
result.complete(transitionFromMasterToStandby(deviceId));
}, WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
return result;
} else {
log.warn("Failed to promote {} to mastership for {}", nodeId, deviceId);
}
if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
}
return CompletableFuture.completedFuture(null);
}
......@@ -267,7 +236,7 @@ public class ConsistentDeviceMastershipStore
String leadershipTopic = createDeviceMastershipTopic(deviceId);
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
return leadership != null ? MastershipTerm.of(leadership.leaderNodeId(), leadership.leader().term()) : null;
}
@Override
......@@ -318,71 +287,44 @@ public class ConsistentDeviceMastershipStore
private CompletableFuture<MastershipEvent> relinquishLocalRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
// Check if this node is can be managed by this node.
if (!connectedDevices.contains(deviceId)) {
return CompletableFuture.completedFuture(null);
}
String leadershipTopic = createDeviceMastershipTopic(deviceId);
NodeId currentLeader = leadershipService.getLeader(leadershipTopic);
MastershipEvent.Type eventType = Objects.equal(currentLeader, localNodeId)
? MastershipEvent.Type.MASTER_CHANGED
: MastershipEvent.Type.BACKUPS_CHANGED;
connectedDevices.remove(deviceId);
return leadershipService.withdraw(leadershipTopic)
.thenApply(v -> new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
return CompletableFuture.completedFuture(null);
}
private MastershipEvent transitionFromMasterToStandby(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
NodeId currentMaster = getMaster(deviceId);
if (currentMaster == null) {
return null;
MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
leadershipService.withdraw(leadershipTopic);
return CompletableFuture.completedFuture(new MastershipEvent(eventType, deviceId, getNodes(deviceId)));
}
if (!currentMaster.equals(localNodeId)) {
log.info("Forwarding request to relinquish "
+ "mastership for device {} to {}", deviceId, currentMaster);
return futureGetOrElse(clusterCommunicator.sendAndReceive(
deviceId,
TRANSITION_FROM_MASTER_TO_STANDBY_SUBJECT,
SERIALIZER::encode,
SERIALIZER::decode,
currentMaster), null);
@Override
public void relinquishAllRole(NodeId nodeId) {
// Noop. LeadershipService already takes care of detecting and purging stale locks.
}
return leadershipService.stepdown(createDeviceMastershipTopic(deviceId))
? new MastershipEvent(MastershipEvent.Type.MASTER_CHANGED, deviceId, getNodes(deviceId)) : null;
}
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
@Override
public void relinquishAllRole(NodeId nodeId) {
// Noop. LeadershipService already takes care of detecting and purging deadlocks.
public boolean isRelevant(LeadershipEvent event) {
Leadership leadership = event.subject();
return isDeviceMastershipTopic(leadership.topic());
}
private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
Leadership leadership = event.subject();
if (!isDeviceMastershipTopic(leadership.topic())) {
return;
}
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
RoleInfo roleInfo = getNodes(deviceId);
switch (event.type()) {
case LEADER_ELECTED:
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
case LEADER_AND_CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
case LEADER_REELECTED:
// There is no concept of leader re-election in the new distributed leadership manager.
throw new IllegalStateException("Unexpected event type");
case LEADER_BOOTED:
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
case LEADER_CHANGED:
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, roleInfo));
break;
case CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
default:
return;
......@@ -407,5 +349,4 @@ public class ConsistentDeviceMastershipStore
Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
return m.matches();
}
}
\ No newline at end of file
......
......@@ -22,6 +22,7 @@ import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.Leader;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
......@@ -31,13 +32,12 @@ import org.onosproject.cluster.NodeId;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.net.intent.Key;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static junit.framework.TestCase.assertFalse;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
......@@ -55,9 +55,10 @@ import static org.junit.Assert.assertTrue;
public class IntentPartitionManagerTest {
private final LeadershipEvent event
= new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED,
= new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(ELECTION_PREFIX + "0",
MY_NODE_ID, 0, 0));
new Leader(MY_NODE_ID, 0, 0),
Arrays.asList(MY_NODE_ID, OTHER_NODE_ID)));
private static final NodeId MY_NODE_ID = new NodeId("local");
private static final NodeId OTHER_NODE_ID = new NodeId("other");
......@@ -78,7 +79,7 @@ public class IntentPartitionManagerTest {
expectLastCall().andDelegateTo(new TestLeadershipService());
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
.andReturn(CompletableFuture.completedFuture(null))
.andReturn(null)
.times(1);
}
......@@ -105,7 +106,9 @@ public class IntentPartitionManagerTest {
expect(leadershipService.getLeader(ELECTION_PREFIX + i))
.andReturn(MY_NODE_ID).anyTimes();
leaderBoard.put(ELECTION_PREFIX + i,
new Leadership(ELECTION_PREFIX + i, MY_NODE_ID, 0, 0));
new Leadership(ELECTION_PREFIX + i,
new Leader(MY_NODE_ID, 0, 0),
Arrays.asList(MY_NODE_ID)));
}
for (int i = numMine; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
......@@ -113,7 +116,9 @@ public class IntentPartitionManagerTest {
.andReturn(OTHER_NODE_ID).anyTimes();
leaderBoard.put(ELECTION_PREFIX + i,
new Leadership(ELECTION_PREFIX + i, OTHER_NODE_ID, 0, 0));
new Leadership(ELECTION_PREFIX + i,
new Leader(OTHER_NODE_ID, 0, 0),
Arrays.asList(OTHER_NODE_ID)));
}
expect(leadershipService.getLeaderBoard()).andReturn(leaderBoard).anyTimes();
......@@ -131,7 +136,7 @@ public class IntentPartitionManagerTest {
for (int i = 0; i < IntentPartitionManager.NUM_PARTITIONS; i++) {
expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
.andReturn(CompletableFuture.completedFuture(null))
.andReturn(null)
.times(1);
}
......@@ -200,9 +205,8 @@ public class IntentPartitionManagerTest {
// We have all the partitions so we'll need to relinquish some
setUpLeadershipService(IntentPartitionManager.NUM_PARTITIONS);
expect(leadershipService.withdraw(anyString()))
.andReturn(CompletableFuture.completedFuture(null))
.times(7);
leadershipService.withdraw(anyString());
expectLastCall().times(7);
replay(leadershipService);
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEvent.Type;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.ListenerRegistry;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
/**
* Distributed Lock Manager implemented on top of ConsistentMap.
* <p>
* This implementation makes use of ClusterService's failure
* detection capabilities to detect and purge stale locks.
* TODO: Ensure lock safety and liveness.
*/
@Component(immediate = true, enabled = true)
@Service
public class DistributedLeadershipManager implements LeadershipService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
private final Logger log = getLogger(getClass());
private ScheduledExecutorService electionRunner;
private ScheduledExecutorService lockExecutor;
private ScheduledExecutorService staleLeadershipPurgeExecutor;
private ScheduledExecutorService leadershipRefresher;
// leader for each topic
private ConsistentMap<String, NodeId> leaderMap;
// list of candidates (includes chosen leader) for each topic
private ConsistentMap<String, List<NodeId>> candidateMap;
private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
// cached copy of leaderMap
// Note: Map value, Leadership, does not contain proper candidates info
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
// cached copy of candidateMap
// Note: Map value, Leadership, does not contain proper leader info
private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
private NodeId localNodeId;
private Set<String> activeTopics = Sets.newConcurrentHashSet();
private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
// The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
@Activate
public void activate() {
leaderMap = storageService.<String, NodeId>consistentMapBuilder()
.withName("onos-topic-leaders")
.withSerializer(SERIALIZER)
.withPartitionsDisabled().build();
candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
.withName("onos-topic-candidates")
.withSerializer(SERIALIZER)
.withPartitionsDisabled().build();
leaderMap.addListener(event -> {
log.debug("Received {}", event);
LeadershipEvent.Type leadershipEventType = null;
if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
} else if (event.type() == MapEvent.Type.REMOVE) {
leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
}
onLeadershipEvent(new LeadershipEvent(
leadershipEventType,
new Leadership(event.key(),
event.value().value(),
event.value().version(),
event.value().creationTime())));
});
candidateMap.addListener(event -> {
log.debug("Received {}", event);
if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
log.error("Entries must not be removed from candidate map");
return;
}
onLeadershipEvent(new LeadershipEvent(
LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(event.key(),
event.value().value(),
event.value().version(),
event.value().creationTime())));
});
localNodeId = clusterService.getLocalNode().id();
electionRunner = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "election-runner"));
lockExecutor = Executors.newScheduledThreadPool(
4, groupedThreads("onos/store/leadership", "election-thread-%d"));
staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/store/leadership", "refresh-thread"));
clusterService.addListener(clusterEventListener);
electionRunner.scheduleWithFixedDelay(
this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
leadershipRefresher.scheduleWithFixedDelay(
this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
listenerRegistry = new ListenerRegistry<>();
eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
log.info("Started");
}
@Deactivate
public void deactivate() {
if (clusterService.getNodes().size() > 1) {
// FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
leaderBoard.forEach((topic, leadership) -> {
if (localNodeId.equals(leadership.leader())) {
withdraw(topic);
}
});
}
clusterService.removeListener(clusterEventListener);
eventDispatcher.removeSink(LeadershipEvent.class);
electionRunner.shutdown();
lockExecutor.shutdown();
staleLeadershipPurgeExecutor.shutdown();
leadershipRefresher.shutdown();
log.info("Stopped");
}
@Override
public Map<String, Leadership> getLeaderBoard() {
return ImmutableMap.copyOf(leaderBoard);
}
@Override
public Map<String, List<NodeId>> getCandidates() {
return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
}
@Override
public List<NodeId> getCandidates(String path) {
Leadership current = candidateBoard.get(path);
return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
}
@Override
public NodeId getLeader(String path) {
Leadership leadership = leaderBoard.get(path);
return leadership != null ? leadership.leader() : null;
}
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
return leaderBoard.get(path);
}
@Override
public Set<String> ownedTopics(NodeId nodeId) {
checkArgument(nodeId != null);
return leaderBoard.entrySet()
.stream()
.filter(entry -> nodeId.equals(entry.getValue().leader()))
.map(Entry::getKey)
.collect(Collectors.toSet());
}
@Override
public CompletableFuture<Leadership> runForLeadership(String path) {
log.debug("Running for leadership for topic: {}", path);
CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
doRunForLeadership(path, resultFuture);
return resultFuture;
}
private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
try {
Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
currentList -> currentList == null || !currentList.contains(localNodeId),
(topic, currentList) -> {
if (currentList == null) {
return ImmutableList.of(localNodeId);
} else {
List<NodeId> newList = Lists.newLinkedList();
newList.addAll(currentList);
newList.add(localNodeId);
return newList;
}
});
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
Leadership leadership = electLeader(path, candidates.value());
if (leadership == null) {
pendingFutures.put(path, future);
} else {
future.complete(leadership);
}
} catch (ConsistentMapException e) {
log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
rerunForLeadership(path, future);
}
}
@Override
public CompletableFuture<Void> withdraw(String path) {
activeTopics.remove(path);
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
doWithdraw(path, resultFuture);
return resultFuture;
}
private void doWithdraw(String path, CompletableFuture<Void> future) {
if (activeTopics.contains(path)) {
future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
}
try {
leaderMap.computeIf(path,
localNodeId::equals,
(topic, leader) -> null);
candidateMap.computeIf(path,
candidates -> candidates != null && candidates.contains(localNodeId),
(topic, candidates) -> candidates.stream()
.filter(nodeId -> !localNodeId.equals(nodeId))
.collect(Collectors.toList()));
future.complete(null);
} catch (Exception e) {
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
retryWithdraw(path, future);
}
}
@Override
public boolean stepdown(String path) {
if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
return false;
}
try {
return leaderMap.computeIf(path,
localNodeId::equals,
(topic, leader) -> null) == null;
} catch (Exception e) {
log.warn("Error executing stepdown for {}", path, e);
}
return false;
}
@Override
public void addListener(LeadershipEventListener listener) {
listenerRegistry.addListener(listener);
}
@Override
public void removeListener(LeadershipEventListener listener) {
listenerRegistry.removeListener(listener);
}
@Override
public boolean makeTopCandidate(String path, NodeId nodeId) {
Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
candidates -> candidates != null &&
candidates.contains(nodeId) &&
!nodeId.equals(Iterables.getFirst(candidates, null)),
(topic, candidates) -> {
List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
updatedCandidates.add(nodeId);
candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
return updatedCandidates;
});
List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
return candidates.size() > 0 && nodeId.equals(candidates.get(0));
}
private Leadership electLeader(String path, List<NodeId> candidates) {
Leadership currentLeadership = getLeadership(path);
if (currentLeadership != null) {
return currentLeadership;
} else {
NodeId topCandidate = candidates
.stream()
.filter(n -> clusterService.getState(n) == ACTIVE)
.findFirst()
.orElse(null);
try {
Versioned<NodeId> leader = localNodeId.equals(topCandidate)
? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
if (leader != null) {
Leadership newLeadership = new Leadership(path,
leader.value(),
leader.version(),
leader.creationTime());
// Since reads only go through the local copy of leader board, we ought to update it
// first before returning from this method.
// This is to ensure a subsequent read will not read a stale value.
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
return newLeadership;
}
} catch (Exception e) {
log.debug("Failed to elect leader for {}", path, e);
}
}
return null;
}
private void electLeaders() {
try {
candidateMap.entrySet().forEach(entry -> {
String path = entry.getKey();
Versioned<List<NodeId>> candidates = entry.getValue();
// for active topics, check if this node can become a leader (if it isn't already)
if (activeTopics.contains(path)) {
lockExecutor.submit(() -> {
Leadership leadership = electLeader(path, candidates.value());
if (leadership != null) {
CompletableFuture<Leadership> future = pendingFutures.remove(path);
if (future != null) {
future.complete(leadership);
}
}
});
}
// Raise a CANDIDATES_CHANGED event to force refresh local candidate board
// and also to update local listeners.
// Don't worry about duplicate events as they will be suppressed.
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(path,
candidates.value(),
candidates.version(),
candidates.creationTime())));
});
} catch (Exception e) {
log.debug("Failure electing leaders", e);
}
}
private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
log.trace("Leadership Event: time = {} type = {} event = {}",
leadershipEvent.time(), leadershipEvent.type(),
leadershipEvent);
Leadership leadershipUpdate = leadershipEvent.subject();
LeadershipEvent.Type eventType = leadershipEvent.type();
String topic = leadershipUpdate.topic();
AtomicBoolean updateAccepted = new AtomicBoolean(false);
if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
updateAccepted.set(true);
return leadershipUpdate;
}
return currentLeadership;
});
} else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
updateAccepted.set(true);
// FIXME: Removing entries from leaderboard is not safe and should be visited.
return null;
}
return currentLeadership;
});
} else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
candidateBoard.compute(topic, (k, currentInfo) -> {
if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
updateAccepted.set(true);
return leadershipUpdate;
}
return currentInfo;
});
} else {
throw new IllegalStateException("Unknown event type.");
}
if (updateAccepted.get()) {
eventDispatcher.post(leadershipEvent);
}
}
private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
lockExecutor.schedule(
() -> doRunForLeadership(path, future),
RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
TimeUnit.MILLISECONDS);
}
private void retryWithdraw(String path, CompletableFuture<Void> future) {
lockExecutor.schedule(
() -> doWithdraw(path, future),
RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
TimeUnit.MILLISECONDS);
}
private void scheduleStaleLeadershipPurge(int afterDelaySec) {
if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
staleLeadershipPurgeExecutor.schedule(
this::purgeStaleLeadership,
afterDelaySec,
TimeUnit.SECONDS);
}
}
/**
* Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
*/
private void purgeStaleLeadership() {
AtomicBoolean rerunPurge = new AtomicBoolean(false);
try {
staleLeadershipPurgeScheduled.set(false);
leaderMap.entrySet()
.stream()
.filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
.forEach(entry -> {
String path = entry.getKey();
NodeId nodeId = entry.getValue().value();
try {
leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
} catch (Exception e) {
log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
rerunPurge.set(true);
}
});
candidateMap.entrySet()
.forEach(entry -> {
String path = entry.getKey();
Versioned<List<NodeId>> candidates = entry.getValue();
List<NodeId> candidatesList = candidates != null
? candidates.value() : Collections.emptyList();
List<NodeId> activeCandidatesList =
candidatesList.stream()
.filter(n -> clusterService.getState(n) == ACTIVE)
.filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
.collect(Collectors.toList());
if (activeCandidatesList.size() < candidatesList.size()) {
Set<NodeId> removedCandidates =
Sets.difference(Sets.newHashSet(candidatesList),
Sets.newHashSet(activeCandidatesList));
try {
candidateMap.computeIf(path,
c -> c.stream()
.filter(n -> clusterService.getState(n) == INACTIVE)
.count() > 0,
(topic, c) -> c.stream()
.filter(n -> clusterService.getState(n) == ACTIVE)
.filter(n -> !localNodeId.equals(n) ||
activeTopics.contains(path))
.collect(Collectors.toList()));
} catch (Exception e) {
log.debug("Failed to evict inactive candidates {} from "
+ "candidate list for {}", removedCandidates, path, e);
rerunPurge.set(true);
}
}
});
} catch (Exception e) {
log.debug("Failure purging state leadership.", e);
rerunPurge.set(true);
}
if (rerunPurge.get()) {
log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
}
}
private void refreshLeaderBoard() {
try {
Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
leaderMap.entrySet().forEach(entry -> {
String path = entry.getKey();
Versioned<NodeId> leader = entry.getValue();
Leadership leadership = new Leadership(path,
leader.value(),
leader.version(),
leader.creationTime());
newLeaderBoard.put(path, leadership);
});
// first take snapshot of current leader board.
Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
// evict stale leaders
diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
});
// add missing leaders
diff.entriesOnlyOnRight().forEach((path, leadership) -> {
log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
});
// add updated leaders
diff.entriesDiffering().forEach((path, difference) -> {
Leadership current = difference.leftValue();
Leadership updated = difference.rightValue();
if (current.epoch() < updated.epoch()) {
log.debug("Updated {} in leaderboard.", updated);
onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
}
});
} catch (Exception e) {
log.debug("Failed to refresh leader board", e);
}
}
private class InternalClusterEventListener implements ClusterEventListener {
@Override
public void event(ClusterEvent event) {
if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
scheduleStaleLeadershipPurge(0);
}
}
}
}