Ayaka Koshibe
Committed by Madan Jampani

Create local storage for topic candidates mapping. This also includes:

 - using Optional in Leadership, and some commenting.
 - using MutableBooleans + compute()

    part of: Device Mastership store on top of LeadershipService
    Reference: ONOS-76

Conflicts:
	core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
	core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java

Change-Id: I7f090abb123cf23bb5126a935a6e72be00f3e3ce
......@@ -85,7 +85,7 @@ public class LeaderCommand extends AbstractShellCommand {
}
private void displayCandidates(Map<String, Leadership> leaderBoard,
Map<String, List<NodeId>> candidates) {
Map<String, Leadership> candidates) {
print("--------------------------------------------------------------");
print(FMT_C, "Topic", "Leader", "Candidates");
print("--------------------------------------------------------------");
......@@ -94,13 +94,13 @@ public class LeaderCommand extends AbstractShellCommand {
.stream()
.sorted(leadershipComparator)
.forEach(l -> {
List<NodeId> list = candidates.get(l.topic());
List<NodeId> list = candidates.get(l.topic()).candidates();
print(FMT_C,
l.topic(),
l.leader(),
list.remove(0).toString());
list.get(0).toString());
// formatting hacks to get it into a table
list.forEach(n -> print(FMT_C, " ", " ", n));
list.subList(1, list.size()).forEach(n -> print(FMT_C, " ", " ", n));
print(FMT_C, " ", " ", " ");
});
print("--------------------------------------------------------------");
......@@ -139,7 +139,7 @@ public class LeaderCommand extends AbstractShellCommand {
print("%s", json(leaderBoard));
} else {
if (showCandidates) {
Map<String, List<NodeId>> candidates = leaderService.getCandidates();
Map<String, Leadership> candidates = leaderService.getCandidates();
displayCandidates(leaderBoard, candidates);
} else {
displayLeaders(leaderBoard);
......
......@@ -17,6 +17,7 @@ package org.onosproject.cluster;
import java.util.Objects;
import java.util.List;
import java.util.Optional;
import org.joda.time.DateTime;
......@@ -33,19 +34,21 @@ import com.google.common.collect.ImmutableList;
* rest in decreasing preference order.</li>
* <li>The epoch is the logical age of a Leadership construct, and should be
* used for comparing two Leaderships, but only of the same topic.</li>
* <li>The leader may be null if its accuracy can't be guaranteed. This applies
* to CANDIDATES_CHANGED events and candidate board contents.</li>
* </ul>
*/
public class Leadership {
private final String topic;
private final NodeId leader;
private final Optional<NodeId> leader;
private final List<NodeId> candidates;
private final long epoch;
private final long electedTime;
public Leadership(String topic, NodeId leader, long epoch, long electedTime) {
this.topic = topic;
this.leader = leader;
this.leader = Optional.of(leader);
this.candidates = ImmutableList.of(leader);
this.epoch = epoch;
this.electedTime = electedTime;
......@@ -54,7 +57,16 @@ public class Leadership {
public Leadership(String topic, NodeId leader, List<NodeId> candidates,
long epoch, long electedTime) {
this.topic = topic;
this.leader = leader;
this.leader = Optional.of(leader);
this.candidates = ImmutableList.copyOf(candidates);
this.epoch = epoch;
this.electedTime = electedTime;
}
public Leadership(String topic, List<NodeId> candidates,
long epoch, long electedTime) {
this.topic = topic;
this.leader = Optional.empty();
this.candidates = ImmutableList.copyOf(candidates);
this.epoch = epoch;
this.electedTime = electedTime;
......@@ -74,8 +86,9 @@ public class Leadership {
*
* @return leader node.
*/
// This will return Optional<NodeId> in the future.
public NodeId leader() {
return leader;
return leader.orElse(null);
}
/**
......
......@@ -43,14 +43,14 @@ public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leaders
LEADER_REELECTED,
/**
* Signifies that the leader has been booted and lost leadership. The event subject is the
* former leader.
* Signifies that the leader has been booted and lost leadership. The
* event subject is the former leader.
*/
LEADER_BOOTED,
/**
* Signifies that the list of candidates for leadership for a topic has
* changed.
* changed. This event does not guarantee accurate leader information.
*/
CANDIDATES_CHANGED
}
......
......@@ -76,9 +76,9 @@ public interface LeadershipService {
/**
* Returns the candidates for all known topics.
*
* @return A map of topics to lists of NodeIds.
* @return A mapping from topics to up-to-date candidate info.
*/
Map<String, List<NodeId>> getCandidates();
Map<String, Leadership> getCandidates();
/**
* Returns the candidates for a given topic.
......
......@@ -65,7 +65,7 @@ public class LeadershipServiceAdapter implements LeadershipService {
}
@Override
public Map<String, List<NodeId>> getCandidates() {
public Map<String, Leadership> getCandidates() {
return null;
}
......
......@@ -576,7 +576,7 @@ public class HazelcastLeadershipService implements LeadershipService {
}
@Override
public Map<String, List<NodeId>> getCandidates() {
public Map<String, Leadership> getCandidates() {
return null;
}
......
......@@ -12,6 +12,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
......@@ -88,6 +89,7 @@ public class DistributedLeadershipManager implements LeadershipService {
private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
listenerRegistry;
private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
private NodeId localNodeId;
private Set<String> activeTopics = Sets.newConcurrentHashSet();
......@@ -164,16 +166,14 @@ public class DistributedLeadershipManager implements LeadershipService {
}
@Override
public Map<String, List<NodeId>> getCandidates() {
Map<String, List<NodeId>> candidates = Maps.newHashMap();
candidateMap.entrySet().forEach(el -> candidates.put(el.getKey(), el.getValue().value()));
return ImmutableMap.copyOf(candidates);
public Map<String, Leadership> getCandidates() {
return ImmutableMap.copyOf(candidateBoard);
}
@Override
public List<NodeId> getCandidates(String path) {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
return candidates == null ? ImmutableList.of() : ImmutableList.copyOf(candidates.value());
Leadership current = candidateBoard.get(path);
return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
}
@Override
......@@ -207,13 +207,21 @@ public class DistributedLeadershipManager implements LeadershipService {
List<NodeId> candidateList = Lists.newArrayList(candidates.value());
if (!candidateList.contains(localNodeId)) {
candidateList.add(localNodeId);
if (!candidateMap.replace(path, candidates.version(), candidateList)) {
if (candidateMap.replace(path, candidates.version(), candidateList)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
notifyCandidateAdded(
path, candidateList, newCandidates.version(), newCandidates.creationTime());
} else {
rerunForLeadership(path);
return;
}
}
} else {
if (!(candidateMap.putIfAbsent(path, ImmutableList.of(localNodeId)) == null)) {
List<NodeId> candidateList = ImmutableList.of(localNodeId);
if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
notifyCandidateAdded(path, candidateList, newCandidates.version(), newCandidates.creationTime());
} else {
rerunForLeadership(path);
return;
}
......@@ -247,10 +255,19 @@ public class DistributedLeadershipManager implements LeadershipService {
if (!candidateList.remove(localNodeId)) {
return;
}
boolean success = candidateList.isEmpty()
? candidateMap.remove(path, candidates.version())
: candidateMap.replace(path, candidates.version(), candidateList);
if (!success) {
boolean success = false;
if (candidateList.isEmpty()) {
if (candidateMap.remove(path, candidates.version())) {
success = true;
}
} else {
if (candidateMap.replace(path, candidates.version(), candidateList)) {
success = true;
}
}
if (success) {
notifyCandidateRemoved(path, candidateList, candidates.version(), candidates.creationTime());
} else {
log.warn("Failed to withdraw from candidates list. Will retry");
retryWithdraw(path);
}
......@@ -321,21 +338,63 @@ public class DistributedLeadershipManager implements LeadershipService {
}
}
private void notifyCandidateAdded(
String path, List<NodeId> candidates, long epoch, long electedTime) {
Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
final MutableBoolean updated = new MutableBoolean(false);
candidateBoard.compute(path, (k, current) -> {
if (current == null || current.epoch() < newInfo.epoch()) {
log.info("updating candidateboard with {}", newInfo);
updated.setTrue();
return newInfo;
}
return current;
});
// maybe rethink types of candidates events
if (updated.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
notifyPeers(event);
}
}
private void notifyCandidateRemoved(
String path, List<NodeId> candidates, long epoch, long electedTime) {
Leadership newInfo = new Leadership(path, candidates, epoch, electedTime);
final MutableBoolean updated = new MutableBoolean(false);
candidateBoard.compute(path, (k, current) -> {
if (current != null && current.epoch() == newInfo.epoch()) {
log.info("updating candidateboard with {}", newInfo);
updated.setTrue();
if (candidates.isEmpty()) {
return null;
} else {
return newInfo;
}
}
return current;
});
// maybe rethink types of candidates events
if (updated.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, newInfo);
notifyPeers(event);
}
}
private void notifyNewLeader(String path, NodeId leader,
List<NodeId> candidates, long epoch, long electedTime) {
Leadership newLeadership = new Leadership(path, leader, candidates, epoch, electedTime);
boolean updatedLeader = false;
final MutableBoolean updatedLeader = new MutableBoolean(false);
log.debug("candidates for new Leadership {}", candidates);
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
leaderBoard.compute(path, (k, currentLeader) -> {
if (currentLeader == null || currentLeader.epoch() < epoch) {
log.debug("updating leaderboard with new {}", newLeadership);
leaderBoard.put(path, newLeadership);
updatedLeader = true;
updatedLeader.setTrue();
return newLeadership;
}
}
return currentLeader;
});
if (updatedLeader) {
if (updatedLeader.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
notifyPeers(event);
}
......@@ -352,21 +411,18 @@ public class DistributedLeadershipManager implements LeadershipService {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
Leadership oldLeadership = new Leadership(
path, leader, candidates.value(), epoch, electedTime);
boolean updatedLeader = false;
synchronized (leaderBoard) {
Leadership currentLeader = leaderBoard.get(path);
final MutableBoolean updatedLeader = new MutableBoolean(false);
leaderBoard.compute(path, (k, currentLeader) -> {
if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
leaderBoard.remove(path);
updatedLeader = true;
updatedLeader.setTrue();
return null;
}
}
return currentLeader;
});
if (updatedLeader) {
if (updatedLeader.booleanValue()) {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
eventDispatcher.post(event);
clusterCommunicator.broadcast(event,
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER::encode);
notifyPeers(event);
}
}
......@@ -385,31 +441,37 @@ public class DistributedLeadershipManager implements LeadershipService {
LeadershipEvent.Type eventType = leadershipEvent.type();
String topic = leadershipUpdate.topic();
boolean updateAccepted = false;
synchronized (leaderBoard) {
Leadership currentLeadership = leaderBoard.get(topic);
if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
MutableBoolean updateAccepted = new MutableBoolean(false);
if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
leaderBoard.put(topic, leadershipUpdate);
updateAccepted = true;
updateAccepted.setTrue();
return leadershipUpdate;
}
} else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
leaderBoard.remove(topic);
updateAccepted = true;
return currentLeadership;
});
} else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
leaderBoard.compute(topic, (k, currentLeadership) -> {
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
updateAccepted.setTrue();
return null;
}
} else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
leaderBoard.replace(topic, leadershipUpdate);
updateAccepted = true;
return currentLeadership;
});
} else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
candidateBoard.compute(topic, (k, currentInfo) -> {
if (currentInfo == null || currentInfo.epoch() <= leadershipUpdate.epoch()) {
updateAccepted.setTrue();
return leadershipUpdate;
}
} else {
throw new IllegalStateException("Unknown event type.");
}
if (updateAccepted) {
eventDispatcher.post(leadershipEvent);
}
return currentInfo;
});
} else {
throw new IllegalStateException("Unknown event type.");
}
if (updateAccepted.booleanValue()) {
eventDispatcher.post(leadershipEvent);
}
}
}
......@@ -470,6 +532,12 @@ public class DistributedLeadershipManager implements LeadershipService {
SERIALIZER::encode);
}
});
candidateBoard.forEach((path, leadership) -> {
LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED, leadership);
clusterCommunicator.broadcast(event,
LEADERSHIP_EVENT_MESSAGE_SUBJECT,
SERIALIZER::encode);
});
} catch (Exception e) {
log.debug("Failed to send leadership updates", e);
}
......
......@@ -111,7 +111,7 @@ public class SimpleLeadershipManager implements LeadershipService {
}
@Override
public Map<String, List<NodeId>> getCandidates() {
public Map<String, Leadership> getCandidates() {
return null;
}
......