Madan Jampani
Committed by Gerrit Code Review

Added couple of methods to LeadershipService.

Change-Id: I259b1a282a51af9425e941a720336f89d66f1097
......@@ -16,6 +16,7 @@
package org.onosproject.cluster;
import java.util.Map;
import java.util.Set;
/**
* Service for leader election.
......@@ -27,13 +28,27 @@ import java.util.Map;
public interface LeadershipService {
/**
* Gets the most recent leader for the topic.
* Returns the current leader for the topic.
* @param path topic
* @return nodeId of the leader, null if so such topic exists.
*/
NodeId getLeader(String path);
/**
* Returns the current leadership info for the topic.
* @param path topic
* @return leadership info or null if so such topic exists.
*/
Leadership getLeadership(String path);
/**
* Returns the set of topics owned by the specified node.
* @param nodeId node Id.
* @return set of topics for which this node is the current leader.
*/
Set<String> ownedTopics(NodeId nodeId);
/**
* Joins the leadership contest.
* @param path topic for which this controller node wishes to be a leader.
*/
......@@ -45,6 +60,10 @@ public interface LeadershipService {
*/
void withdraw(String path);
/**
* Returns the current leader board.
* @return mapping from topic to leadership info.
*/
Map<String, Leadership> getLeaderBoard();
/**
......
......@@ -46,10 +46,12 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
......@@ -162,6 +164,28 @@ public class HazelcastLeadershipService implements LeadershipService {
}
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
Topic topic = topics.get(path);
if (topic != null) {
return new Leadership(topic.topicName(),
topic.leader(),
topic.term());
}
return null;
}
@Override
public Set<String> ownedTopics(NodeId nodeId) {
checkArgument(nodeId != null);
return topics.values()
.stream()
.filter(topic -> nodeId.equals(topic.leader()))
.map(topic -> topic.topicName)
.collect(Collectors.toSet());
}
@Override
public void runForLeadership(String path) {
checkArgument(path != null);
Topic topic = new Topic(path);
......
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.impl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.LeadershipEvent;
import org.onosproject.cluster.LeadershipEventListener;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Lock;
import org.onosproject.store.service.LockService;
import org.slf4j.Logger;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Distributed implementation of LeadershipService that is based on the primitives exposed by
* LockService.
*/
@Component(enabled = false)
@Service
public class LeadershipManager implements LeadershipService {
private final Logger log = getLogger(getClass());
private static final int TERM_DURATION_MS = 2000;
// Time to wait before retrying leadership after
// a unexpected error.
private static final int WAIT_BEFORE_RETRY_MS = 2000;
// TODO: Make Thread pool size configurable.
private final ScheduledExecutorService threadPool =
Executors.newScheduledThreadPool(25, groupedThreads("onos/leadership", "manager-%d"));
private static final MessageSubject LEADERSHIP_UPDATES =
new MessageSubject("leadership-contest-updates");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private LockService lockService;
private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
private final Map<String, Lock> openContests = Maps.newConcurrentMap();
private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
private NodeId localNodeId;
private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
private ExecutorService messageHandlingExecutor;
public static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.build()
.populate(1);
}
};
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
addListener(peerAdvertiser);
addListener(leaderBoardUpdater);
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/leadership",
"peer-advertisement-handler"));
clusterCommunicator.addSubscriber(
LEADERSHIP_UPDATES,
new PeerAdvertisementHandler(),
messageHandlingExecutor);
log.info("Started.");
}
@Deactivate
public void deactivate() {
removeListener(peerAdvertiser);
removeListener(leaderBoardUpdater);
clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
messageHandlingExecutor.shutdown();
threadPool.shutdown();
log.info("Stopped.");
}
@Override
public NodeId getLeader(String path) {
synchronized (leaderBoard) {
Leadership leadership = leaderBoard.get(path);
if (leadership != null) {
return leadership.leader();
}
}
return null;
}
@Override
public void runForLeadership(String path) {
checkArgument(path != null);
if (openContests.containsKey(path)) {
log.info("Already in the leadership contest for {}", path);
return;
} else {
Lock lock = lockService.create(path);
openContests.put(path, lock);
threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
}
}
@Override
public void withdraw(String path) {
checkArgument(path != null);
Lock lock = openContests.remove(path);
if (lock != null && lock.isLocked()) {
lock.unlock();
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNodeId, lock.epoch())));
}
}
@Override
public Map<String, Leadership> getLeaderBoard() {
return ImmutableMap.copyOf(leaderBoard);
}
@Override
public void addListener(LeadershipEventListener listener) {
checkArgument(listener != null);
listeners.add(listener);
}
@Override
public void removeListener(LeadershipEventListener listener) {
checkArgument(listener != null);
listeners.remove(listener);
}
private void notifyListeners(LeadershipEvent event) {
for (LeadershipEventListener listener : listeners) {
try {
listener.event(event);
} catch (Exception e) {
log.error("Notifying listener failed with exception.", e);
}
}
}
private void tryAcquireLeadership(String path) {
Lock lock = openContests.get(path);
if (lock == null) {
// withdrew from race.
return;
}
lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
if (error == null) {
threadPool.schedule(
new ReelectionTask(lock),
TERM_DURATION_MS / 2,
TimeUnit.MILLISECONDS);
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(lock.path(), localNodeId, lock.epoch())));
return;
} else {
log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
}
});
}
private class ReelectionTask implements Runnable {
private final Lock lock;
public ReelectionTask(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
if (!openContests.containsKey(lock.path())) {
log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
return;
}
boolean lockExtended = false;
try {
lockExtended = lock.extendExpiration(TERM_DURATION_MS);
} catch (Exception e) {
log.warn("Attempt to extend lock failed with an exception.", e);
}
if (lockExtended) {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
new Leadership(lock.path(), localNodeId, lock.epoch())));
threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
} else {
// Check if this node already withdrew from the contest, in which case
// we don't need to notify here.
if (openContests.containsKey(lock.path())) {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(lock.path(), localNodeId, lock.epoch())));
// Retry leadership after a brief wait.
threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
}
}
}
}
private class TryLeadership implements Runnable {
private final Lock lock;
public TryLeadership(Lock lock) {
this.lock = lock;
}
@Override
public void run() {
tryAcquireLeadership(lock.path());
}
}
private class PeerAdvertiser implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
// publish events originating on this host.
if (event.subject().leader().equals(localNodeId)) {
clusterCommunicator.broadcast(
new ClusterMessage(
localNodeId,
LEADERSHIP_UPDATES,
SERIALIZER.encode(event)));
}
}
}
private class PeerAdvertisementHandler implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
LeadershipEvent event = SERIALIZER.decode(message.payload());
log.trace("Received {} from {}", event, message.sender());
notifyListeners(event);
}
}
private class LeaderBoardUpdater implements LeadershipEventListener {
@Override
public void event(LeadershipEvent event) {
Leadership leadershipUpdate = event.subject();
synchronized (leaderBoard) {
Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
switch (event.type()) {
case LEADER_ELECTED:
case LEADER_REELECTED:
if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
}
break;
case LEADER_BOOTED:
if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
leaderBoard.remove(leadershipUpdate.topic());
}
break;
default:
break;
}
}
}
}
}
......@@ -2,6 +2,7 @@ package org.onosproject.store.consistent.impl;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Map.Entry;
......@@ -10,6 +11,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -171,6 +173,22 @@ public class DistributedLeadershipManager implements LeadershipService {
}
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
return leaderBoard.get(path);
}
@Override
public Set<String> ownedTopics(NodeId nodeId) {
checkArgument(nodeId != null);
return leaderBoard.entrySet()
.stream()
.filter(entry -> nodeId.equals(entry.getValue().leader()))
.map(Entry::getKey)
.collect(Collectors.toSet());
}
@Override
public void runForLeadership(String path) {
log.info("Running for leadership for topic: {}", path);
activeTopics.add(path);
......
......@@ -15,10 +15,14 @@
*/
package org.onosproject.store.trivial.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
......@@ -55,6 +59,22 @@ public class SimpleLeadershipManager implements LeadershipService {
}
@Override
public Leadership getLeadership(String path) {
checkArgument(path != null);
return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0) : null;
}
@Override
public Set<String> ownedTopics(NodeId nodeId) {
checkArgument(nodeId != null);
return elections.entrySet()
.stream()
.filter(Entry::getValue)
.map(Entry::getKey)
.collect(Collectors.toSet());
}
@Override
public void runForLeadership(String path) {
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
......@@ -88,5 +108,4 @@ public class SimpleLeadershipManager implements LeadershipService {
public void removeListener(LeadershipEventListener listener) {
listeners.remove(listener);
}
}
......