Yuta HIGUCHI
Committed by Ray Milkey

Fix potential race conditions in HazelcastLeadershipService

Change-Id: Iac232652155830c8e054760ea371ffb5639cf464
......@@ -166,8 +166,8 @@ public class HazelcastLeadershipService implements LeadershipService,
checkArgument(path != null);
Topic topic = topics.get(path);
if (topic != null) {
topic.stop();
topics.remove(path, topic);
topic.stop();
}
}
......@@ -213,6 +213,7 @@ public class HazelcastLeadershipService implements LeadershipService,
topic = new Topic(topicName);
Topic oldTopic = topics.putIfAbsent(topicName, topic);
if (oldTopic == null) {
// encountered new topic, start periodic processing
topic.start();
} else {
topic = oldTopic;
......@@ -285,7 +286,11 @@ public class HazelcastLeadershipService implements LeadershipService,
/**
* Starts operation.
*/
private void start() {
private synchronized void start() {
if (!isShutdown) {
// already running
return;
}
isShutdown = false;
String threadPoolName = "onos-leader-election-" + topicName + "-%d";
leaderElectionExecutor = Executors.newScheduledThreadPool(2,
......@@ -303,13 +308,14 @@ public class HazelcastLeadershipService implements LeadershipService,
/**
* Runs for leadership.
*/
private void runForLeadership() {
private synchronized void runForLeadership() {
if (isRunningForLeadership) {
return; // Nothing to do: already running
}
if (isShutdown) {
start();
}
isRunningForLeadership = true;
String lockHzId = "LeadershipService/" + topicName + "/lock";
String termHzId = "LeadershipService/" + topicName + "/term";
leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
......@@ -326,7 +332,7 @@ public class HazelcastLeadershipService implements LeadershipService,
/**
* Stops leadership election for the topic.
*/
private void stop() {
private synchronized void stop() {
isShutdown = true;
isRunningForLeadership = false;
// getLockFuture.cancel(true);
......@@ -454,22 +460,22 @@ public class HazelcastLeadershipService implements LeadershipService,
continue;
}
synchronized (this) {
//
// This instance is now the leader
//
log.info("Leader Elected for topic {}", topicName);
updateTerm();
leader = localNodeId;
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
}
try {
synchronized (this) {
//
// This instance is now the leader
//
log.info("Leader Elected for topic {}", topicName);
updateTerm();
leader = localNodeId;
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
}
// Sleep forever until interrupted
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
......@@ -479,23 +485,24 @@ public class HazelcastLeadershipService implements LeadershipService,
//
log.debug("Leader Interrupted for topic {}",
topicName);
}
synchronized (this) {
// If we reach here, we should release the leadership
log.debug("Leader Lock Released for topic {}", topicName);
if ((leader != null) &&
leader.equals(localNodeId)) {
leader = null;
} finally {
synchronized (this) {
// If we reach here, we should release the leadership
log.debug("Leader Lock Released for topic {}", topicName);
if ((leader != null) &&
leader.equals(localNodeId)) {
leader = null;
}
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
leaderLock.unlock();
}
leadershipEvent = new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(topicName, localNodeId, myLastLeaderTerm));
leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
leaderLock.unlock();
}
}
isRunningForLeadership = false;
}
// Globally guarded by the leadership lock for this term
......