Madan Jampani
Committed by Gerrit Code Review

AsyncLeaderElector APIs in support for mastership balancing

Change-Id: Ia235c6a18c54490dc49ca13e2caebf70b750dbc7
......@@ -60,6 +60,16 @@ public class DefaultLeaderElector extends Synchronous<AsyncLeaderElector> implem
}
@Override
public boolean promote(String topic, NodeId nodeId) {
return complete(asyncElector.promote(topic, nodeId));
}
@Override
public void evict(NodeId nodeId) {
complete(asyncElector.evict(nodeId));
}
@Override
public Leadership getLeadership(String topic) {
return complete(asyncElector.getLeadership(topic));
}
......
......@@ -76,6 +76,28 @@ public interface AsyncLeaderElector extends DistributedPrimitive {
CompletableFuture<Boolean> anoint(String topic, NodeId nodeId);
/**
* Attempts to evict a node from all leadership elections it is registered for.
* <p>
* If the node is the current leader for a topic, this call will promote the next top candidate
* (if one exists) to leadership.
*
* @param nodeId node instance identifier
* @return CompletableFuture that is completed when the operation is done.
*/
CompletableFuture<Void> evict(NodeId nodeId);
/**
* Attempts to promote a node to top of candidate list without displacing the current leader.
*
* @param topic leadership topic
* @param nodeId instance identifier of the new top candidate
* @return CompletableFuture that is completed with a boolean when the operation is done. Boolean is true if
* node is now the top candidate. This operation can fail (i.e. return false) if the node
* is not registered to run for election for the topic.
*/
CompletableFuture<Boolean> promote(String topic, NodeId nodeId);
/**
* Returns the {@link Leadership} for the specified topic.
* @param topic leadership topic
* @return CompletableFuture that is completed with the current Leadership state of the topic
......
......@@ -58,6 +58,25 @@ public interface LeaderElector extends DistributedPrimitive {
boolean anoint(String topic, NodeId nodeId);
/**
* Attempts to promote a node to top of candidate list.
*
* @param topic leadership topic
* @param nodeId instance identifier of the new top candidate
* @return {@code true} if node is now the top candidate. This operation can fail (i.e. return
* {@code false}) if the node is not registered to run for election for the topic.
*/
boolean promote(String topic, NodeId nodeId);
/**
* Attempts to evict a node from all leadership elections it is registered for.
* <p>
* If the node the current leader for a topic, this call will force the next candidate (if one exists)
* to be promoted to leadership.
* @param nodeId node instance identifier
*/
void evict(NodeId nodeId);
/**
* Returns the {@link Leadership} for the specified topic.
* @param topic leadership topic
* @return current Leadership state of the topic
......
......@@ -128,8 +128,7 @@ public class NewDistributedLeadershipStore
@Override
public void removeRegistration(NodeId nodeId) {
// TODO
throw new UnsupportedOperationException();
leaderElector.evict(nodeId);
}
@Override
......@@ -139,8 +138,7 @@ public class NewDistributedLeadershipStore
@Override
public boolean makeTopCandidate(String topic, NodeId nodeId) {
// TODO
throw new UnsupportedOperationException();
return leaderElector.promote(topic, nodeId);
}
@Override
......
......@@ -70,6 +70,18 @@ public class PartitionedAsyncLeaderElector implements AsyncLeaderElector {
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return getLeaderElector(topic).promote(topic, nodeId);
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
return CompletableFuture.allOf(getLeaderElectors().stream()
.map(le -> le.evict(nodeId))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return getLeaderElector(topic).getLeadership(topic);
}
......
......@@ -20,6 +20,7 @@ import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.Resource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -43,6 +44,7 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
private final Set<Consumer<Change<Leadership>>> leadershipChangeListeners =
Sets.newConcurrentHashSet();
public static final String CHANGE_SUBJECT = "changeEvents";
private Listener<Change<Leadership>> listener;
public AtomixLeaderElector(CopycatClient client, Resource.Options options) {
......@@ -57,13 +59,13 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
@Override
public CompletableFuture<AtomixLeaderElector> open() {
return super.open().thenApply(result -> {
client.onEvent("change", this::handleEvent);
client.onEvent(CHANGE_SUBJECT, this::handleEvent);
return result;
});
}
private void handleEvent(Change<Leadership> change) {
leadershipChangeListeners.forEach(l -> l.accept(change));
private void handleEvent(List<Change<Leadership>> changes) {
changes.forEach(change -> leadershipChangeListeners.forEach(l -> l.accept(change)));
}
@Override
......@@ -82,6 +84,16 @@ public class AtomixLeaderElector extends Resource<AtomixLeaderElector>
}
@Override
public CompletableFuture<Boolean> promote(String topic, NodeId nodeId) {
return submit(new AtomixLeaderElectorCommands.Promote(topic, nodeId));
}
@Override
public CompletableFuture<Void> evict(NodeId nodeId) {
return submit(new AtomixLeaderElectorCommands.Evict(nodeId));
}
@Override
public CompletableFuture<Leadership> getLeadership(String topic) {
return submit(new AtomixLeaderElectorCommands.GetLeadership(topic));
}
......
......@@ -346,6 +346,102 @@ public final class AtomixLeaderElectorCommands {
}
/**
* Command for administratively promote a node as top candidate.
*/
@SuppressWarnings("serial")
public static class Promote extends ElectionCommand<Boolean> {
private String topic;
private NodeId nodeId;
public Promote() {
}
public Promote(String topic, NodeId nodeId) {
this.topic = topic;
this.nodeId = nodeId;
}
/**
* Returns the topic.
*
* @return The topic
*/
public String topic() {
return topic;
}
/**
* Returns the nodeId to make top candidate.
*
* @return The nodeId
*/
public NodeId nodeId() {
return nodeId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("topic", topic)
.add("nodeId", nodeId)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeString(topic);
buffer.writeString(nodeId.toString());
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
topic = buffer.readString();
nodeId = new NodeId(buffer.readString());
}
}
/**
* Command for administratively evicting a node from all leadership topics.
*/
@SuppressWarnings("serial")
public static class Evict extends ElectionCommand<Void> {
private NodeId nodeId;
public Evict() {
}
public Evict(NodeId nodeId) {
this.nodeId = nodeId;
}
/**
* Returns the node identifier.
*
* @return The nodeId
*/
public NodeId nodeId() {
return nodeId;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("nodeId", nodeId)
.toString();
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeString(nodeId.toString());
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
nodeId = new NodeId(buffer.readString());
}
}
/**
* Map command type resolver.
*/
public static class TypeResolver implements SerializableTypeResolver {
......@@ -359,6 +455,8 @@ public final class AtomixLeaderElectorCommands {
registry.register(GetLeadership.class, -866);
registry.register(Listen.class, -867);
registry.register(Unlisten.class, -868);
registry.register(Promote.class, -869);
registry.register(Evict.class, -870);
}
}
}
......
......@@ -43,10 +43,12 @@ import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Anoint;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Evict;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetAllLeaderships;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetElectedTopics;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.GetLeadership;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Promote;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Run;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Unlisten;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands.Withdraw;
......@@ -86,6 +88,8 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
executor.register(Run.class, this::run);
executor.register(Withdraw.class, this::withdraw);
executor.register(Anoint.class, this::anoint);
executor.register(Promote.class, this::promote);
executor.register(Evict.class, this::evict);
// Queries
executor.register(GetLeadership.class, this::leadership);
executor.register(GetAllLeaderships.class, this::allLeaderships);
......@@ -93,8 +97,16 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
private void notifyLeadershipChange(Leadership previousLeadership, Leadership newLeadership) {
Change<Leadership> change = new Change<>(previousLeadership, newLeadership);
listeners.values().forEach(listener -> listener.session().publish("change", change));
notifyLeadershipChanges(Arrays.asList(new Change<>(previousLeadership, newLeadership)));
}
private void notifyLeadershipChanges(List<Change<Leadership>> changes) {
if (changes.isEmpty()) {
return;
}
listeners.values()
.forEach(listener -> listener.session()
.publish(AtomixLeaderElector.CHANGE_SUBJECT, changes));
}
@Override
......@@ -206,6 +218,53 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
/**
* Applies an {@link AtomixLeaderElectorCommands.Promote} commit.
* @param commit promote commit
* @return {@code true} if changes desired end state is achieved.
*/
public boolean promote(Commit<? extends Promote> commit) {
try {
String topic = commit.operation().topic();
NodeId nodeId = commit.operation().nodeId();
Leadership oldLeadership = leadership(topic);
if (oldLeadership == null || !oldLeadership.candidates().contains(nodeId)) {
return false;
}
elections.computeIfPresent(topic, (k, v) -> new ElectionState(v).promote(commit.operation().nodeId()));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
notifyLeadershipChange(oldLeadership, newLeadership);
}
return true;
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.Evict} commit.
* @param commit evict commit
*/
public void evict(Commit<? extends Evict> commit) {
try {
List<Change<Leadership>> changes = Lists.newLinkedList();
NodeId nodeId = commit.operation().nodeId();
Set<String> topics = Maps.filterValues(elections, e -> e.candidates().contains(nodeId)).keySet();
topics.forEach(topic -> {
Leadership oldLeadership = leadership(topic);
elections.compute(topic, (k, v) -> v.evict(nodeId, termCounter(topic)::incrementAndGet));
Leadership newLeadership = leadership(topic);
if (!Objects.equal(oldLeadership, newLeadership)) {
changes.add(new Change<>(oldLeadership, newLeadership));
}
});
notifyLeadershipChanges(changes);
} finally {
commit.close();
}
}
/**
* Applies an {@link AtomixLeaderElectorCommands.GetLeadership} commit.
* @param commit GetLeadership commit
* @return leader
......@@ -362,6 +421,31 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
}
}
public ElectionState evict(NodeId nodeId, Supplier<Long> termCounter) {
Optional<Registration> registration =
registrations.stream().filter(r -> r.nodeId.equals(nodeId)).findFirst();
if (registration.isPresent()) {
List<Registration> updatedRegistrations =
registrations.stream()
.filter(r -> !r.nodeId().equals(nodeId))
.collect(Collectors.toList());
if (leader.nodeId().equals(nodeId)) {
if (updatedRegistrations.size() > 0) {
return new ElectionState(updatedRegistrations,
updatedRegistrations.get(0),
termCounter.get(),
System.currentTimeMillis());
} else {
return new ElectionState(updatedRegistrations, null, term, termStartTime);
}
} else {
return new ElectionState(updatedRegistrations, leader, term, termStartTime);
}
} else {
return this;
}
}
public boolean isDuplicate(Registration registration) {
return registrations.stream().anyMatch(r -> r.sessionId() == registration.sessionId());
}
......@@ -406,6 +490,23 @@ public class AtomixLeaderElectorState extends ResourceStateMachine
return this;
}
}
public ElectionState promote(NodeId nodeId) {
Registration registration = registrations.stream()
.filter(r -> r.nodeId().equals(nodeId))
.findFirst()
.orElse(null);
List<Registration> updatedRegistrations = Lists.newLinkedList();
updatedRegistrations.add(registration);
registrations.stream()
.filter(r -> !r.nodeId().equals(nodeId))
.forEach(updatedRegistrations::add);
return new ElectionState(updatedRegistrations,
leader,
term,
termStartTime);
}
}
@Override
......
......@@ -51,10 +51,10 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
public void testRun() throws Throwable {
leaderElectorRunTests(1);
clearTests();
// leaderElectorRunTests(2);
// clearTests();
// leaderElectorRunTests(3);
// clearTests();
leaderElectorRunTests(2);
clearTests();
leaderElectorRunTests(3);
clearTests();
}
private void leaderElectorRunTests(int numServers) throws Throwable {
......@@ -183,6 +183,63 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
}
@Test
public void testPromote() throws Throwable {
leaderElectorPromoteTests(1);
clearTests();
leaderElectorPromoteTests(2);
clearTests();
leaderElectorPromoteTests(3);
clearTests();
}
private void leaderElectorPromoteTests(int numServers) throws Throwable {
createCopycatServers(numServers);
Atomix client1 = createAtomixClient();
AtomixLeaderElector elector1 = client1.get("test-elector", AtomixLeaderElector.class).join();
Atomix client2 = createAtomixClient();
AtomixLeaderElector elector2 = client2.get("test-elector", AtomixLeaderElector.class).join();
Atomix client3 = createAtomixClient();
AtomixLeaderElector elector3 = client3.get("test-elector", AtomixLeaderElector.class).join();
elector1.run("foo", node1).join();
elector2.run("foo", node2).join();
LeaderEventListener listener1 = new LeaderEventListener();
elector1.addChangeListener(listener1).join();
LeaderEventListener listener2 = new LeaderEventListener();
elector2.addChangeListener(listener2).join();
LeaderEventListener listener3 = new LeaderEventListener();
elector3.addChangeListener(listener3).join();
elector3.promote("foo", node3).thenAccept(result -> {
assertFalse(result);
}).join();
assertFalse(listener1.hasEvent());
assertFalse(listener2.hasEvent());
assertFalse(listener3.hasEvent());
elector3.run("foo", node3).join();
listener1.clearEvents();
listener2.clearEvents();
listener3.clearEvents();
elector3.promote("foo", node3).thenAccept(result -> {
assertTrue(result);
}).join();
listener1.nextEvent().thenAccept(result -> {
assertEquals(node3, result.newValue().candidates().get(0));
}).join();
listener2.nextEvent().thenAccept(result -> {
assertEquals(node3, result.newValue().candidates().get(0));
}).join();
listener3.nextEvent().thenAccept(result -> {
assertEquals(node3, result.newValue().candidates().get(0));
}).join();
}
@Test
public void testLeaderSessionClose() throws Throwable {
leaderElectorLeaderSessionCloseTests(1);
clearTests();
......@@ -325,6 +382,10 @@ public class AtomixLeaderElectorTest extends AtomixTestBase {
return !eventQueue.isEmpty();
}
public void clearEvents() {
eventQueue.clear();
}
public CompletableFuture<Change<Leadership>> nextEvent() {
synchronized (this) {
if (eventQueue.isEmpty()) {
......