Jonathan Hart
Committed by Brian O'Connor

Add term information in HazelcastLeadershipService

Implementation based on Hazelcast IAtomicLong.

Change-Id: I9dca40228a84fdb3edf02ffd2cc32d7d38c90378
...@@ -29,15 +29,16 @@ import java.util.Map; ...@@ -29,15 +29,16 @@ import java.util.Map;
29 description = "Finds the leader for particular topic.") 29 description = "Finds the leader for particular topic.")
30 public class LeaderCommand extends AbstractShellCommand { 30 public class LeaderCommand extends AbstractShellCommand {
31 31
32 - private static final String FMT = "%-20s: %15s"; 32 + private static final String FMT = "%-20s: %15s %5s";
33 33
34 @Override 34 @Override
35 protected void execute() { 35 protected void execute() {
36 LeadershipService leaderService = get(LeadershipService.class); 36 LeadershipService leaderService = get(LeadershipService.class);
37 Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard(); 37 Map<String, Leadership> leaderBoard = leaderService.getLeaderBoard();
38 - print(FMT, "Topic", "Leader"); 38 + print(FMT, "Topic", "Leader", "Epoch");
39 for (String topic : leaderBoard.keySet()) { 39 for (String topic : leaderBoard.keySet()) {
40 - print(FMT, topic, leaderBoard.get(topic).leader()); 40 + Leadership leadership = leaderBoard.get(topic);
41 + print(FMT, topic, leadership.leader(), leadership.epoch());
41 } 42 }
42 } 43 }
43 44
......
...@@ -15,22 +15,19 @@ ...@@ -15,22 +15,19 @@
15 */ 15 */
16 package org.onosproject.store.cluster.impl; 16 package org.onosproject.store.cluster.impl;
17 17
18 -import static com.google.common.base.Preconditions.checkArgument; 18 +import com.google.common.collect.Maps;
19 -import static org.onlab.util.Tools.namedThreads; 19 +import com.hazelcast.config.TopicConfig;
20 - 20 +import com.hazelcast.core.IAtomicLong;
21 -import java.util.HashMap; 21 +import com.hazelcast.core.ITopic;
22 -import java.util.Map; 22 +import com.hazelcast.core.Message;
23 -import java.util.concurrent.ExecutorService; 23 +import com.hazelcast.core.MessageListener;
24 -import java.util.concurrent.Executors;
25 -import java.util.concurrent.Future;
26 -import java.util.concurrent.locks.Lock;
27 -
28 import org.apache.felix.scr.annotations.Activate; 24 import org.apache.felix.scr.annotations.Activate;
29 import org.apache.felix.scr.annotations.Component; 25 import org.apache.felix.scr.annotations.Component;
30 import org.apache.felix.scr.annotations.Deactivate; 26 import org.apache.felix.scr.annotations.Deactivate;
31 import org.apache.felix.scr.annotations.Reference; 27 import org.apache.felix.scr.annotations.Reference;
32 import org.apache.felix.scr.annotations.ReferenceCardinality; 28 import org.apache.felix.scr.annotations.ReferenceCardinality;
33 import org.apache.felix.scr.annotations.Service; 29 import org.apache.felix.scr.annotations.Service;
30 +import org.onlab.util.KryoNamespace;
34 import org.onosproject.cluster.ClusterService; 31 import org.onosproject.cluster.ClusterService;
35 import org.onosproject.cluster.Leadership; 32 import org.onosproject.cluster.Leadership;
36 import org.onosproject.cluster.LeadershipEvent; 33 import org.onosproject.cluster.LeadershipEvent;
...@@ -42,15 +39,18 @@ import org.onosproject.event.EventDeliveryService; ...@@ -42,15 +39,18 @@ import org.onosproject.event.EventDeliveryService;
42 import org.onosproject.store.hz.StoreService; 39 import org.onosproject.store.hz.StoreService;
43 import org.onosproject.store.serializers.KryoNamespaces; 40 import org.onosproject.store.serializers.KryoNamespaces;
44 import org.onosproject.store.serializers.KryoSerializer; 41 import org.onosproject.store.serializers.KryoSerializer;
45 -import org.onlab.util.KryoNamespace;
46 import org.slf4j.Logger; 42 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory; 43 import org.slf4j.LoggerFactory;
48 44
49 -import com.google.common.collect.Maps; 45 +import java.util.HashMap;
50 -import com.hazelcast.config.TopicConfig; 46 +import java.util.Map;
51 -import com.hazelcast.core.ITopic; 47 +import java.util.concurrent.ExecutorService;
52 -import com.hazelcast.core.Message; 48 +import java.util.concurrent.Executors;
53 -import com.hazelcast.core.MessageListener; 49 +import java.util.concurrent.Future;
50 +import java.util.concurrent.locks.Lock;
51 +
52 +import static com.google.common.base.Preconditions.checkArgument;
53 +import static org.onlab.util.Tools.namedThreads;
54 54
55 /** 55 /**
56 * Distributed implementation of LeadershipService that is based on Hazelcast. 56 * Distributed implementation of LeadershipService that is based on Hazelcast.
...@@ -90,6 +90,9 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -90,6 +90,9 @@ public class HazelcastLeadershipService implements LeadershipService,
90 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s 90 private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
91 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics"; 91 private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
92 92
93 + // indicates there is no term value yet
94 + private static final long NO_TERM = 0;
95 +
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected ClusterService clusterService; 97 protected ClusterService clusterService;
95 98
...@@ -175,7 +178,7 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -175,7 +178,7 @@ public class HazelcastLeadershipService implements LeadershipService,
175 for (Topic topic : topics.values()) { 178 for (Topic topic : topics.values()) {
176 Leadership leadership = new Leadership(topic.topicName(), 179 Leadership leadership = new Leadership(topic.topicName(),
177 topic.leader(), 180 topic.leader(),
178 - 0L); 181 + topic.term());
179 result.put(topic.topicName(), leadership); 182 result.put(topic.topicName(), leadership);
180 } 183 }
181 return result; 184 return result;
...@@ -229,6 +232,12 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -229,6 +232,12 @@ public class HazelcastLeadershipService implements LeadershipService,
229 private volatile long lastLeadershipUpdateMs = 0; 232 private volatile long lastLeadershipUpdateMs = 0;
230 private ExecutorService leaderElectionExecutor; 233 private ExecutorService leaderElectionExecutor;
231 234
235 + private volatile IAtomicLong term;
236 + // This is local state, recording the term number for the last time
237 + // this instance was leader for this topic. The current term could be
238 + // higher if the mastership has changed any times.
239 + private long myLastLeaderTerm = NO_TERM;
240 +
232 private NodeId leader; 241 private NodeId leader;
233 private Lock leaderLock; 242 private Lock leaderLock;
234 private Future<?> getLockFuture; 243 private Future<?> getLockFuture;
...@@ -262,6 +271,18 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -262,6 +271,18 @@ public class HazelcastLeadershipService implements LeadershipService,
262 } 271 }
263 272
264 /** 273 /**
274 + * Gets the current term for the topic.
275 + *
276 + * @return the term for the topic
277 + */
278 + private long term() {
279 + if (term == null) {
280 + return NO_TERM;
281 + }
282 + return term.get();
283 + }
284 +
285 + /**
265 * Starts operation. 286 * Starts operation.
266 */ 287 */
267 private void start() { 288 private void start() {
...@@ -290,7 +311,10 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -290,7 +311,10 @@ public class HazelcastLeadershipService implements LeadershipService,
290 start(); 311 start();
291 } 312 }
292 String lockHzId = "LeadershipService/" + topicName + "/lock"; 313 String lockHzId = "LeadershipService/" + topicName + "/lock";
314 + String termHzId = "LeadershipService/" + topicName + "/term";
293 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId); 315 leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
316 + term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
317 +
294 getLockFuture = leaderElectionExecutor.submit(new Runnable() { 318 getLockFuture = leaderElectionExecutor.submit(new Runnable() {
295 @Override 319 @Override
296 public void run() { 320 public void run() {
...@@ -374,7 +398,7 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -374,7 +398,7 @@ public class HazelcastLeadershipService implements LeadershipService,
374 // 398 //
375 leadershipEvent = new LeadershipEvent( 399 leadershipEvent = new LeadershipEvent(
376 LeadershipEvent.Type.LEADER_REELECTED, 400 LeadershipEvent.Type.LEADER_REELECTED,
377 - new Leadership(topicName, localNodeId, 0)); 401 + new Leadership(topicName, localNodeId, myLastLeaderTerm));
378 // Dispatch to all instances 402 // Dispatch to all instances
379 leaderTopic.publish(SERIALIZER.encode(leadershipEvent)); 403 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
380 } else { 404 } else {
...@@ -386,7 +410,7 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -386,7 +410,7 @@ public class HazelcastLeadershipService implements LeadershipService,
386 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) { 410 if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
387 leadershipEvent = new LeadershipEvent( 411 leadershipEvent = new LeadershipEvent(
388 LeadershipEvent.Type.LEADER_BOOTED, 412 LeadershipEvent.Type.LEADER_BOOTED,
389 - new Leadership(topicName, leader, 0)); 413 + new Leadership(topicName, leader, myLastLeaderTerm));
390 // Dispatch only to the local listener(s) 414 // Dispatch only to the local listener(s)
391 eventDispatcher.post(leadershipEvent); 415 eventDispatcher.post(leadershipEvent);
392 leader = null; 416 leader = null;
...@@ -435,10 +459,13 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -435,10 +459,13 @@ public class HazelcastLeadershipService implements LeadershipService,
435 // This instance is now the leader 459 // This instance is now the leader
436 // 460 //
437 log.info("Leader Elected for topic {}", topicName); 461 log.info("Leader Elected for topic {}", topicName);
462 +
463 + updateTerm();
464 +
438 leader = localNodeId; 465 leader = localNodeId;
439 leadershipEvent = new LeadershipEvent( 466 leadershipEvent = new LeadershipEvent(
440 LeadershipEvent.Type.LEADER_ELECTED, 467 LeadershipEvent.Type.LEADER_ELECTED,
441 - new Leadership(topicName, localNodeId, 0)); 468 + new Leadership(topicName, localNodeId, myLastLeaderTerm));
442 leaderTopic.publish(SERIALIZER.encode(leadershipEvent)); 469 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
443 } 470 }
444 471
...@@ -463,11 +490,22 @@ public class HazelcastLeadershipService implements LeadershipService, ...@@ -463,11 +490,22 @@ public class HazelcastLeadershipService implements LeadershipService,
463 } 490 }
464 leadershipEvent = new LeadershipEvent( 491 leadershipEvent = new LeadershipEvent(
465 LeadershipEvent.Type.LEADER_BOOTED, 492 LeadershipEvent.Type.LEADER_BOOTED,
466 - new Leadership(topicName, localNodeId, 0)); 493 + new Leadership(topicName, localNodeId, myLastLeaderTerm));
467 leaderTopic.publish(SERIALIZER.encode(leadershipEvent)); 494 leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
468 leaderLock.unlock(); 495 leaderLock.unlock();
469 } 496 }
470 } 497 }
498 +
499 + }
500 +
501 + // Globally guarded by the leadership lock for this term
502 + // Locally guarded by synchronized (this)
503 + private void updateTerm() {
504 + long oldTerm = term.get();
505 + long newTerm = term.incrementAndGet();
506 + myLastLeaderTerm = newTerm;
507 + log.debug("Topic {} updated term from {} to {}", topicName,
508 + oldTerm, newTerm);
471 } 509 }
472 } 510 }
473 } 511 }
......