Madan Jampani

Publish a list of changes when leadership changes occur

Change-Id: I99a4e239ac5aa9999b3a735cdf004941a5957a93
......@@ -291,17 +291,14 @@ public final class AtomixLeaderElectorCommands {
}
/**
* Command for administratively anointing a node as leader.
* Command for administratively changing the leadership state for a node.
*/
@SuppressWarnings("serial")
public static class Anoint extends ElectionCommand<Boolean> {
public abstract static class ElectionChangeCommand<V> extends ElectionCommand<V> {
private String topic;
private NodeId nodeId;
public Anoint() {
}
public Anoint(String topic, NodeId nodeId) {
public ElectionChangeCommand(String topic, NodeId nodeId) {
this.topic = topic;
this.nodeId = nodeId;
}
......@@ -346,57 +343,22 @@ public final class AtomixLeaderElectorCommands {
}
/**
* Command for administratively promote a node as top candidate.
* Command for administratively anoint a node as leader.
*/
@SuppressWarnings("serial")
public static class Promote extends ElectionCommand<Boolean> {
private String topic;
private NodeId nodeId;
public Promote() {
}
public Promote(String topic, NodeId nodeId) {
this.topic = topic;
this.nodeId = nodeId;
public static class Anoint extends ElectionChangeCommand<Boolean> {
public Anoint(String topic, NodeId nodeId) {
super(topic, nodeId);
}
/**
* Returns the topic.
*
* @return The topic
*/
public String topic() {
return topic;
}
/**
* Returns the nodeId to make top candidate.
*
* @return The nodeId
* Command for administratively promote a node as top candidate.
*/
public NodeId nodeId() {
return nodeId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("topic", topic)
.add("nodeId", nodeId)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeString(topic);
buffer.writeString(nodeId.toString());
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
topic = buffer.readString();
nodeId = new NodeId(buffer.readString());
@SuppressWarnings("serial")
public static class Promote extends ElectionChangeCommand<Boolean> {
public Promote(String topic, NodeId nodeId) {
super(topic, nodeId);
}
}
......
......@@ -97,7 +97,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
notifyLeadershipChanges(Lists.newArrayList(new Change<>(previousLeadership, newLeadership)));
}
private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
......@@ -247,7 +247,7 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
*/
public void evict(Commit<? extends Evict> commit) {
try {
List<Change<Leadership>> changes = Lists.newLinkedList();
List<Change<Leadership>> changes = Lists.newArrayList();
NodeId nodeId = commit.operation().nodeId();
Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
topics.forEach(topic -> {
......@@ -330,14 +330,16 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
listener.close();
}
Set<String> topics = elections.keySet();
List<Change<Leadership>> changes = Lists.newArrayList();
topics.forEach(topic -> {
Leadership oldLeadership = leadership(topic);
elections.compute(topic, (k, v) -> v.cleanup(session, termCounter(topic)::incrementAndGet));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
changes.add(new Change<>(oldLeadership, newLeadership));
}
});
notifyLeadershipChanges(changes);
}
private static class Registration {
......