Madan Jampani
Committed by Pavlin Radoslavov

LeadershipService: Support for a leaderBoard.

Change-Id: I0dd8267e104466ec65a2c67d23d1c4d923cad266

Change-Id: I6bc548510400eacabb12482f8fba1b7f2abb0604
......@@ -11,12 +11,12 @@ public class Leadership {
private final String topic;
private final ControllerNode leader;
private final long term;
private final long epoch;
public Leadership(String topic, ControllerNode leader, long term) {
public Leadership(String topic, ControllerNode leader, long epoch) {
this.topic = topic;
this.leader = leader;
this.term = term;
this.epoch = epoch;
}
/**
......@@ -36,16 +36,16 @@ public class Leadership {
}
/**
* The term number associated with this leadership.
* @return leadership term
* The epoch when the leadership was assumed.
* @return leadership epoch
*/
public long term() {
return term;
public long epoch() {
return epoch;
}
@Override
public int hashCode() {
return Objects.hash(topic, leader, term);
return Objects.hash(topic, leader, epoch);
}
@Override
......@@ -53,7 +53,7 @@ public class Leadership {
return MoreObjects.toStringHelper(this.getClass())
.add("topic", topic)
.add("leader", leader)
.add("term", term)
.add("epoch", epoch)
.toString();
}
}
\ No newline at end of file
......
......@@ -17,14 +17,21 @@ package org.onlab.onos.cluster;
/**
* Service for leader election.
* Leadership contents are organized around topics. ONOS instance can join the
* leadership race for a topic or withdraw from a race it has previously joined
* Once in the race, the instance can get asynchronously notified
* of leadership election results.
* Leadership contests are organized around topics. A instance can join the
* leadership race for a topic or withdraw from a race it has previously joined.
* Listeners can be added to receive notifications asynchronously for various
* leadership contests.
*/
public interface LeadershipService {
/**
* Gets the most recent leader for the topic.
* @param path topic
* @return node who is the leader, null if so such topic exists.
*/
ControllerNode getLeader(String path);
/**
* Joins the leadership contest.
* @param path topic for which this controller node wishes to be a leader.
*/
......
......@@ -76,6 +76,15 @@ public interface Lock {
boolean isLocked();
/**
* Returns the epoch for this lock.
* If this lock is currently locked i.e. isLocked() returns true, epoch signifies the logical time
* when the lock was acquired. The concept of epoch lets one come up with a global ordering for all
* lock acquisition events
* @return epoch
*/
long epoch();
/**
* Releases the lock.
*/
void unlock();
......
......@@ -5,6 +5,7 @@ import static com.google.common.base.Verify.verifyNotNull;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
......@@ -23,9 +24,16 @@ import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.Lock;
import org.onlab.onos.store.service.LockService;
import org.onlab.onos.store.service.impl.DistributedLockManager;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
......@@ -45,32 +53,83 @@ public class LeadershipManager implements LeadershipService {
private static final int TERM_DURATION_MS =
DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
// Time to wait before retrying leadership after
// a unexpected error.
private static final int WAIT_BEFORE_RETRY_MS = 2000;
// TODO: Appropriate Thread pool sizing.
private static final ScheduledExecutorService THREAD_POOL =
Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
private static final MessageSubject LEADERSHIP_UPDATES =
new MessageSubject("leadership-contest-updates");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private LockService lockService;
private Map<String, Lock> openContests = Maps.newHashMap();
private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
private final Map<String, Lock> openContests = Maps.newHashMap();
private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
private ControllerNode localNode;
private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
}
};
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
addListener(peerAdvertiser);
addListener(leaderBoardUpdater);
clusterCommunicator.addSubscriber(
LEADERSHIP_UPDATES,
new PeerAdvertisementHandler());
log.info("Started.");
}
@Deactivate
public void deactivate() {
removeListener(peerAdvertiser);
removeListener(leaderBoardUpdater);
clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
THREAD_POOL.shutdown();
log.info("Stopped.");
}
@Override
public ControllerNode getLeader(String path) {
synchronized (leaderBoard) {
Leadership leadership = leaderBoard.get(path);
if (leadership != null) {
return leadership.leader();
}
}
return null;
}
@Override
public void runForLeadership(String path) {
checkArgument(path != null);
......@@ -94,8 +153,7 @@ public class LeadershipManager implements LeadershipService {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNode, 0)));
// FIXME: Should set the correct term information.
new Leadership(lock.path(), localNode, lock.epoch())));
}
}
......@@ -123,26 +181,31 @@ public class LeadershipManager implements LeadershipService {
lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
if (error == null) {
THREAD_POOL.schedule(
new RelectionTask(lock),
new ReelectionTask(lock),
TERM_DURATION_MS / 2,
TimeUnit.MILLISECONDS);
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(lock.path(), localNode, 0)));
new Leadership(lock.path(), localNode, lock.epoch())));
return;
} else {
log.error("Failed to acquire lock for {}", path, error);
// retry
tryAcquireLeadership(path);
log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
try {
Thread.sleep(WAIT_BEFORE_RETRY_MS);
tryAcquireLeadership(path);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
private class RelectionTask implements Runnable {
private class ReelectionTask implements Runnable {
private final Lock lock;
public RelectionTask(Lock lock) {
public ReelectionTask(Lock lock) {
this.lock = lock;
}
......@@ -152,17 +215,69 @@ public class LeadershipManager implements LeadershipService {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(lock.path(), localNode, 0)));
new Leadership(lock.path(), localNode, lock.epoch())));
THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
} else {
if (openContests.containsKey(lock.path())) {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNode, 0)));
new Leadership(lock.path(), localNode, lock.epoch())));
tryAcquireLeadership(lock.path());
}
}
}
}
private class PeerAdvertiser implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
// publish events originating on this host.
if (event.subject().leader().equals(localNode)) {
try {
clusterCommunicator.broadcast(
new ClusterMessage(
localNode.id(),
LEADERSHIP_UPDATES,
SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast leadership update message", e);
}
}
}
}
private class PeerAdvertisementHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
LeadershipEvent event = SERIALIZER.decode(message.payload());
log.debug("Received {} from {}", event, message.sender());
notifyListeners(event);
}
}
private class LeaderBoardUpdater implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
Leadership leadershipUpdate = event.subject();
synchronized (leaderBoard) {
Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
switch (event.type()) {
case LEADER_ELECTED:
case LEADER_REELECTED:
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
}
break;
case LEADER_BOOTED:
if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
leaderBoard.remove(leadershipUpdate.topic());
}
break;
default:
break;
}
}
}
}
}
\ No newline at end of file
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Verify.verify;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -15,6 +17,7 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
/**
......@@ -29,6 +32,7 @@ public class DistributedLock implements Lock {
private final String path;
private DateTime lockExpirationTime;
private AtomicBoolean isLocked = new AtomicBoolean(false);
private volatile long epoch = 0;
private byte[] lockId;
public DistributedLock(
......@@ -74,6 +78,10 @@ public class DistributedLock implements Lock {
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId)) {
VersionedValue vv =
databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
verify(Arrays.equals(vv.value(), lockId));
epoch = vv.version();
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
......@@ -121,6 +129,11 @@ public class DistributedLock implements Lock {
}
@Override
public long epoch() {
return epoch;
}
@Override
public void unlock() {
if (!isLocked()) {
return;
......
......@@ -25,6 +25,8 @@ import java.util.LinkedList;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.Leadership;
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.cluster.RoleInfo;
import org.onlab.onos.core.DefaultApplicationId;
......@@ -166,6 +168,9 @@ public final class KryoNamespaces {
Link.Type.class,
Link.State.class,
Timestamp.class,
Leadership.class,
LeadershipEvent.class,
LeadershipEvent.Type.class,
HostId.class,
HostDescription.class,
DefaultHostDescription.class,
......