Madan Jampani
Committed by Pavlin Radoslavov

Make leadership election more robust to failures.

    - Catch exceptions thrown by lock extension calls.
    - Dealing with potential race conditions between joining and withdrawing from a race.

Change-Id: I429045b33f5972c459d5ed031fe8593438813e8d
1 package org.onlab.onos.store.cluster.impl; 1 package org.onlab.onos.store.cluster.impl;
2 2
3 import static com.google.common.base.Preconditions.checkArgument; 3 import static com.google.common.base.Preconditions.checkArgument;
4 -import static com.google.common.base.Verify.verifyNotNull;
5 import static org.onlab.util.Tools.namedThreads; 4 import static org.onlab.util.Tools.namedThreads;
6 import static org.slf4j.LoggerFactory.getLogger; 5 import static org.slf4j.LoggerFactory.getLogger;
7 6
...@@ -75,7 +74,7 @@ public class LeadershipManager implements LeadershipService { ...@@ -75,7 +74,7 @@ public class LeadershipManager implements LeadershipService {
75 74
76 private final Map<String, Leadership> leaderBoard = Maps.newHashMap(); 75 private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
77 76
78 - private final Map<String, Lock> openContests = Maps.newHashMap(); 77 + private final Map<String, Lock> openContests = Maps.newConcurrentMap();
79 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet(); 78 private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
80 private ControllerNode localNode; 79 private ControllerNode localNode;
81 80
...@@ -133,19 +132,21 @@ public class LeadershipManager implements LeadershipService { ...@@ -133,19 +132,21 @@ public class LeadershipManager implements LeadershipService {
133 @Override 132 @Override
134 public void runForLeadership(String path) { 133 public void runForLeadership(String path) {
135 checkArgument(path != null); 134 checkArgument(path != null);
135 +
136 if (openContests.containsKey(path)) { 136 if (openContests.containsKey(path)) {
137 log.info("Already in the leadership contest for {}", path); 137 log.info("Already in the leadership contest for {}", path);
138 return; 138 return;
139 } else { 139 } else {
140 Lock lock = lockService.create(path); 140 Lock lock = lockService.create(path);
141 openContests.put(path, lock); 141 openContests.put(path, lock);
142 - tryAcquireLeadership(path); 142 + threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
143 } 143 }
144 } 144 }
145 145
146 @Override 146 @Override
147 public void withdraw(String path) { 147 public void withdraw(String path) {
148 checkArgument(path != null); 148 checkArgument(path != null);
149 +
149 Lock lock = openContests.remove(path); 150 Lock lock = openContests.remove(path);
150 151
151 if (lock != null && lock.isLocked()) { 152 if (lock != null && lock.isLocked()) {
...@@ -171,13 +172,20 @@ public class LeadershipManager implements LeadershipService { ...@@ -171,13 +172,20 @@ public class LeadershipManager implements LeadershipService {
171 172
172 private void notifyListeners(LeadershipEvent event) { 173 private void notifyListeners(LeadershipEvent event) {
173 for (LeadershipEventListener listener : listeners) { 174 for (LeadershipEventListener listener : listeners) {
174 - listener.event(event); 175 + try {
176 + listener.event(event);
177 + } catch (Exception e) {
178 + log.error("Notifying listener failed with exception.", e);
179 + }
175 } 180 }
176 } 181 }
177 182
178 private void tryAcquireLeadership(String path) { 183 private void tryAcquireLeadership(String path) {
179 Lock lock = openContests.get(path); 184 Lock lock = openContests.get(path);
180 - verifyNotNull(lock, "Lock should not be null"); 185 + if (lock == null) {
186 + // withdrew from race.
187 + return;
188 + }
181 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> { 189 lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
182 if (error == null) { 190 if (error == null) {
183 threadPool.schedule( 191 threadPool.schedule(
...@@ -190,13 +198,8 @@ public class LeadershipManager implements LeadershipService { ...@@ -190,13 +198,8 @@ public class LeadershipManager implements LeadershipService {
190 new Leadership(lock.path(), localNode, lock.epoch()))); 198 new Leadership(lock.path(), localNode, lock.epoch())));
191 return; 199 return;
192 } else { 200 } else {
193 - log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error); 201 + log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
194 - try { 202 + threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
195 - Thread.sleep(WAIT_BEFORE_RETRY_MS);
196 - tryAcquireLeadership(path);
197 - } catch (InterruptedException e) {
198 - Thread.currentThread().interrupt();
199 - }
200 } 203 }
201 }); 204 });
202 } 205 }
...@@ -211,24 +214,52 @@ public class LeadershipManager implements LeadershipService { ...@@ -211,24 +214,52 @@ public class LeadershipManager implements LeadershipService {
211 214
212 @Override 215 @Override
213 public void run() { 216 public void run() {
214 - if (lock.extendExpiration(TERM_DURATION_MS)) { 217 + if (!openContests.containsKey(lock.path())) {
218 + log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
219 + return;
220 + }
221 +
222 + boolean lockExtended = false;
223 + try {
224 + lockExtended = lock.extendExpiration(TERM_DURATION_MS);
225 + } catch (Exception e) {
226 + log.warn("Attempt to extend lock failed with an exception.", e);
227 + }
228 +
229 + if (lockExtended) {
215 notifyListeners( 230 notifyListeners(
216 new LeadershipEvent( 231 new LeadershipEvent(
217 LeadershipEvent.Type.LEADER_REELECTED, 232 LeadershipEvent.Type.LEADER_REELECTED,
218 new Leadership(lock.path(), localNode, lock.epoch()))); 233 new Leadership(lock.path(), localNode, lock.epoch())));
219 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS); 234 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
220 } else { 235 } else {
236 + // Check if this node already withdrew from the contest, in which case
237 + // we don't need to notify here.
221 if (openContests.containsKey(lock.path())) { 238 if (openContests.containsKey(lock.path())) {
222 notifyListeners( 239 notifyListeners(
223 new LeadershipEvent( 240 new LeadershipEvent(
224 LeadershipEvent.Type.LEADER_BOOTED, 241 LeadershipEvent.Type.LEADER_BOOTED,
225 new Leadership(lock.path(), localNode, lock.epoch()))); 242 new Leadership(lock.path(), localNode, lock.epoch())));
226 - tryAcquireLeadership(lock.path()); 243 + // Retry leadership after a brief wait.
244 + threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
227 } 245 }
228 } 246 }
229 } 247 }
230 } 248 }
231 249
250 + private class TryLeadership implements Runnable {
251 + private final Lock lock;
252 +
253 + public TryLeadership(Lock lock) {
254 + this.lock = lock;
255 + }
256 +
257 + @Override
258 + public void run() {
259 + tryAcquireLeadership(lock.path());
260 + }
261 + }
262 +
232 private class PeerAdvertiser implements LeadershipEventListener { 263 private class PeerAdvertiser implements LeadershipEventListener {
233 @Override 264 @Override
234 public void event(LeadershipEvent event) { 265 public void event(LeadershipEvent event) {
......