Madan Jampani

Added a distributed leadership manager implementation on top of consistent map.

Change-Id: I3f3c6114df72e3ab033ba39c8608ac4ae11e5272
...@@ -53,7 +53,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> { ...@@ -53,7 +53,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
53 private final DatabaseProxy<String, byte[]> proxy; 53 private final DatabaseProxy<String, byte[]> proxy;
54 private final Serializer serializer; 54 private final Serializer serializer;
55 55
56 - private static final int OPERATION_TIMEOUT_MILLIS = 1000; 56 + private static final int OPERATION_TIMEOUT_MILLIS = 5000;
57 private static final String ERROR_NULL_KEY = "Key cannot be null"; 57 private static final String ERROR_NULL_KEY = "Key cannot be null";
58 private static final String ERROR_NULL_VALUE = "Null values are not allowed"; 58 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
59 59
......
...@@ -17,11 +17,13 @@ ...@@ -17,11 +17,13 @@
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 import com.google.common.collect.Sets; 19 import com.google.common.collect.Sets;
20 +
20 import net.kuujo.copycat.cluster.ClusterConfig; 21 import net.kuujo.copycat.cluster.ClusterConfig;
21 import net.kuujo.copycat.cluster.Member; 22 import net.kuujo.copycat.cluster.Member;
22 import net.kuujo.copycat.log.FileLog; 23 import net.kuujo.copycat.log.FileLog;
23 import net.kuujo.copycat.netty.NettyTcpProtocol; 24 import net.kuujo.copycat.netty.NettyTcpProtocol;
24 import net.kuujo.copycat.protocol.Consistency; 25 import net.kuujo.copycat.protocol.Consistency;
26 +
25 import org.apache.felix.scr.annotations.Activate; 27 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component; 28 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate; 29 import org.apache.felix.scr.annotations.Deactivate;
...@@ -44,6 +46,8 @@ import java.io.IOException; ...@@ -44,6 +46,8 @@ import java.io.IOException;
44 import java.util.List; 46 import java.util.List;
45 import java.util.Map; 47 import java.util.Map;
46 import java.util.Set; 48 import java.util.Set;
49 +import java.util.concurrent.CountDownLatch;
50 +import java.util.concurrent.TimeUnit;
47 import java.util.stream.Collectors; 51 import java.util.stream.Collectors;
48 52
49 import static org.slf4j.LoggerFactory.getLogger; 53 import static org.slf4j.LoggerFactory.getLogger;
...@@ -60,6 +64,7 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -60,6 +64,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
60 public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT 64 public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
61 private static final String CONFIG_DIR = "../config"; 65 private static final String CONFIG_DIR = "../config";
62 private static final String PARTITION_DEFINITION_FILE = "tablets.json"; 66 private static final String PARTITION_DEFINITION_FILE = "tablets.json";
67 + private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
63 68
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected ClusterService clusterService; 70 protected ClusterService clusterService;
...@@ -131,13 +136,23 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -131,13 +136,23 @@ public class DatabaseManager implements StorageService, StorageAdminService {
131 136
132 partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig); 137 partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
133 138
139 + CountDownLatch latch = new CountDownLatch(1);
134 partitionedDatabase.open().whenComplete((db, error) -> { 140 partitionedDatabase.open().whenComplete((db, error) -> {
135 if (error != null) { 141 if (error != null) {
136 log.warn("Failed to open database.", error); 142 log.warn("Failed to open database.", error);
137 } else { 143 } else {
144 + latch.countDown();
138 log.info("Successfully opened database."); 145 log.info("Successfully opened database.");
139 } 146 }
140 }); 147 });
148 + try {
149 + if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
150 + log.warn("Timeed out watiing for database to initialize.");
151 + }
152 + } catch (InterruptedException e) {
153 + Thread.currentThread().interrupt();
154 + log.warn("Failed to complete database initialization.");
155 + }
141 log.info("Started"); 156 log.info("Started");
142 } 157 }
143 158
......
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import static org.onlab.util.Tools.groupedThreads;
4 +import static org.slf4j.LoggerFactory.getLogger;
5 +
6 +import java.util.Map;
7 +import java.util.Map.Entry;
8 +import java.util.Set;
9 +import java.util.concurrent.ExecutorService;
10 +import java.util.concurrent.Executors;
11 +import java.util.concurrent.ScheduledExecutorService;
12 +import java.util.concurrent.TimeUnit;
13 +
14 +import org.apache.felix.scr.annotations.Activate;
15 +import org.apache.felix.scr.annotations.Component;
16 +import org.apache.felix.scr.annotations.Deactivate;
17 +import org.apache.felix.scr.annotations.Reference;
18 +import org.apache.felix.scr.annotations.ReferenceCardinality;
19 +import org.apache.felix.scr.annotations.Service;
20 +import org.onlab.util.KryoNamespace;
21 +import org.onosproject.cluster.ClusterService;
22 +import org.onosproject.cluster.ControllerNode;
23 +import org.onosproject.cluster.Leadership;
24 +import org.onosproject.cluster.LeadershipEvent;
25 +import org.onosproject.cluster.LeadershipEventListener;
26 +import org.onosproject.cluster.LeadershipService;
27 +import org.onosproject.cluster.NodeId;
28 +import org.onosproject.event.AbstractListenerRegistry;
29 +import org.onosproject.event.EventDeliveryService;
30 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
31 +import org.onosproject.store.cluster.messaging.ClusterMessage;
32 +import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
33 +import org.onosproject.store.cluster.messaging.MessageSubject;
34 +import org.onosproject.store.serializers.KryoNamespaces;
35 +import org.onosproject.store.serializers.KryoSerializer;
36 +import org.onosproject.store.service.ConsistentMap;
37 +import org.onosproject.store.service.Serializer;
38 +import org.onosproject.store.service.StorageService;
39 +import org.onosproject.store.service.Versioned;
40 +import org.slf4j.Logger;
41 +
42 +import com.google.common.collect.ImmutableMap;
43 +import com.google.common.collect.Maps;
44 +import com.google.common.collect.Sets;
45 +
46 +/**
47 + * Distributed Lock Manager implemented on top of ConsistentMap.
48 + * <p>
49 + * This implementation makes use of cluster manager's failure
50 + * detection capabilities to detect and purge stale locks.
51 + * TODO: Ensure lock safety and liveness.
52 + */
53 +@Component(immediate = true, enabled = false)
54 +@Service
55 +public class DistributedLeadershipManager implements LeadershipService {
56 +
57 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
58 + protected StorageService storageService;
59 +
60 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
61 + protected ClusterService clusterService;
62 +
63 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
64 + protected ClusterCommunicationService clusterCommunicator;
65 +
66 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 + protected EventDeliveryService eventDispatcher;
68 +
69 + private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
70 + new MessageSubject("distributed-leadership-manager-events");
71 +
72 + private final Logger log = getLogger(getClass());
73 + private ExecutorService messageHandlingExecutor;
74 + private ScheduledExecutorService retryLeaderLockExecutor;
75 + private ScheduledExecutorService deadLockDetectionExecutor;
76 + private ScheduledExecutorService leadershipStatusBroadcaster;
77 +
78 + private ConsistentMap<String, NodeId> lockMap;
79 + private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
80 + listenerRegistry;
81 + private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
82 + private NodeId localNodeId;
83 +
84 + private Set<String> activeTopics = Sets.newConcurrentHashSet();
85 +
86 + private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
87 + private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
88 + private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL = 2;
89 +
90 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
91 + @Override
92 + protected void setupKryoPool() {
93 + serializerPool = KryoNamespace.newBuilder()
94 + .register(KryoNamespaces.API)
95 + .build()
96 + .populate(1);
97 + }
98 + };
99 +
100 + @Activate
101 + public void activate() {
102 + lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() {
103 + KryoNamespace kryo = new KryoNamespace.Builder()
104 + .register(KryoNamespaces.API).build();
105 +
106 + @Override
107 + public <T> byte[] encode(T object) {
108 + return kryo.serialize(object);
109 + }
110 +
111 + @Override
112 + public <T> T decode(byte[] bytes) {
113 + return kryo.deserialize(bytes);
114 + }
115 + });
116 +
117 + localNodeId = clusterService.getLocalNode().id();
118 +
119 + messageHandlingExecutor = Executors.newSingleThreadExecutor(
120 + groupedThreads("onos/store/leadership", "message-handler"));
121 + retryLeaderLockExecutor = Executors.newScheduledThreadPool(
122 + 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
123 + deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
124 + groupedThreads("onos/store/leadership", "dead-lock-detector"));
125 + leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
126 + groupedThreads("onos/store/leadership", "peer-updater"));
127 + clusterCommunicator.addSubscriber(
128 + LEADERSHIP_EVENT_MESSAGE_SUBJECT,
129 + new InternalLeadershipEventListener(),
130 + messageHandlingExecutor);
131 +
132 + deadLockDetectionExecutor.scheduleWithFixedDelay(
133 + this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
134 + leadershipStatusBroadcaster.scheduleWithFixedDelay(
135 + this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL, TimeUnit.SECONDS);
136 +
137 + listenerRegistry = new AbstractListenerRegistry<>();
138 + eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
139 +
140 + log.info("Started.");
141 + }
142 +
143 + @Deactivate
144 + public void deactivate() {
145 + leaderBoard.forEach((topic, leadership) -> {
146 + if (localNodeId.equals(leadership.leader())) {
147 + withdraw(topic);
148 + }
149 + });
150 +
151 + eventDispatcher.removeSink(LeadershipEvent.class);
152 + clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
153 +
154 + messageHandlingExecutor.shutdown();
155 + retryLeaderLockExecutor.shutdown();
156 + deadLockDetectionExecutor.shutdown();
157 + leadershipStatusBroadcaster.shutdown();
158 +
159 + log.info("Stopped.");
160 + }
161 +
162 + @Override
163 + public Map<String, Leadership> getLeaderBoard() {
164 + return ImmutableMap.copyOf(leaderBoard);
165 + }
166 +
167 + @Override
168 + public NodeId getLeader(String path) {
169 + Leadership leadership = leaderBoard.get(path);
170 + return leadership != null ? leadership.leader() : null;
171 + }
172 +
173 + @Override
174 + public void runForLeadership(String path) {
175 + log.info("Running for leadership for topic: {}", path);
176 + activeTopics.add(path);
177 + tryLeaderLock(path);
178 + }
179 +
180 + @Override
181 + public void withdraw(String path) {
182 + activeTopics.remove(path);
183 + try {
184 + if (lockMap.remove(path, localNodeId)) {
185 + log.info("Sucessfully gave up leadership for {}", path);
186 + }
187 + // else we are not the current owner.
188 + } catch (Exception e) {
189 + log.warn("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
190 + }
191 + }
192 +
193 + @Override
194 + public void addListener(LeadershipEventListener listener) {
195 + listenerRegistry.addListener(listener);
196 + }
197 +
198 + @Override
199 + public void removeListener(LeadershipEventListener listener) {
200 + listenerRegistry.removeListener(listener);
201 + }
202 +
203 + private void tryLeaderLock(String path) {
204 + if (!activeTopics.contains(path)) {
205 + return;
206 + }
207 + try {
208 + Versioned<NodeId> currentLeader = lockMap.get(path);
209 + if (currentLeader != null) {
210 + if (localNodeId.equals(currentLeader.value())) {
211 + log.info("Already has leadership for {}", path);
212 + notifyNewLeader(path, localNodeId, currentLeader.version());
213 + } else {
214 + // someone else has leadership. will retry after sometime.
215 + retry(path);
216 + }
217 + } else {
218 + if (lockMap.putIfAbsent(path, localNodeId) == null) {
219 + log.info("Assumed leadership for {}", path);
220 + // do a get again to get the version (epoch)
221 + Versioned<NodeId> newLeader = lockMap.get(path);
222 + notifyNewLeader(path, localNodeId, newLeader.version());
223 + } else {
224 + // someone beat us to it.
225 + retry(path);
226 + }
227 + }
228 + } catch (Exception e) {
229 + log.warn("Attempt to acquire leadership lock for topic {} failed", path, e);
230 + retry(path);
231 + }
232 + }
233 +
234 + private void notifyNewLeader(String path, NodeId leader, long epoch) {
235 + Leadership newLeadership = new Leadership(path, leader, epoch);
236 + boolean updatedLeader = false;
237 + synchronized (leaderBoard) {
238 + Leadership currentLeader = leaderBoard.get(path);
239 + if (currentLeader == null || currentLeader.epoch() < epoch) {
240 + leaderBoard.put(path, newLeadership);
241 + updatedLeader = true;
242 + }
243 + }
244 +
245 + if (updatedLeader) {
246 + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
247 + eventDispatcher.post(event);
248 + clusterCommunicator.broadcast(
249 + new ClusterMessage(
250 + clusterService.getLocalNode().id(),
251 + LEADERSHIP_EVENT_MESSAGE_SUBJECT,
252 + SERIALIZER.encode(event)));
253 + }
254 + }
255 +
256 + private void notifyRemovedLeader(String path, NodeId leader, long epoch) {
257 + Leadership oldLeadership = new Leadership(path, leader, epoch);
258 + boolean updatedLeader = false;
259 + synchronized (leaderBoard) {
260 + Leadership currentLeader = leaderBoard.get(path);
261 + if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
262 + leaderBoard.remove(path);
263 + updatedLeader = true;
264 + }
265 + }
266 +
267 + if (updatedLeader) {
268 + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
269 + eventDispatcher.post(event);
270 + clusterCommunicator.broadcast(
271 + new ClusterMessage(
272 + clusterService.getLocalNode().id(),
273 + LEADERSHIP_EVENT_MESSAGE_SUBJECT,
274 + SERIALIZER.encode(event)));
275 + }
276 + }
277 +
278 + private class InternalLeadershipEventListener implements ClusterMessageHandler {
279 +
280 + @Override
281 + public void handle(ClusterMessage message) {
282 + LeadershipEvent leadershipEvent =
283 + SERIALIZER.decode(message.payload());
284 +
285 + log.trace("Leadership Event: time = {} type = {} event = {}",
286 + leadershipEvent.time(), leadershipEvent.type(),
287 + leadershipEvent);
288 +
289 + Leadership leadershipUpdate = leadershipEvent.subject();
290 + LeadershipEvent.Type eventType = leadershipEvent.type();
291 + String topic = leadershipUpdate.topic();
292 +
293 + boolean updateAccepted = false;
294 +
295 + synchronized (leaderBoard) {
296 + Leadership currentLeadership = leaderBoard.get(topic);
297 + if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
298 + if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
299 + leaderBoard.put(topic, leadershipUpdate);
300 + updateAccepted = true;
301 + }
302 + } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
303 + if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
304 + leaderBoard.remove(topic);
305 + updateAccepted = true;
306 + }
307 + } else {
308 + throw new IllegalStateException("Unknown event type.");
309 + }
310 + if (updateAccepted) {
311 + eventDispatcher.post(leadershipEvent);
312 + }
313 + }
314 + }
315 + }
316 +
317 + private void retry(String path) {
318 + retryLeaderLockExecutor.schedule(
319 + () -> tryLeaderLock(path),
320 + DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
321 + TimeUnit.SECONDS);
322 + }
323 +
324 + private void purgeStaleLocks() {
325 + try {
326 + Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
327 + entries.forEach(entry -> {
328 + String path = entry.getKey();
329 + NodeId nodeId = entry.getValue().value();
330 + long epoch = entry.getValue().version();
331 + if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
332 + log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
333 + try {
334 + if (lockMap.remove(path, epoch)) {
335 + log.info("Successfully purged stale lock held by {} for {}", nodeId, path);
336 + notifyRemovedLeader(path, nodeId, epoch);
337 + }
338 + } catch (Exception e) {
339 + log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
340 + }
341 + }
342 + if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
343 + log.info("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
344 + try {
345 + if (lockMap.remove(path, epoch)) {
346 + log.info("Successfully purged stale lock held by {} for {}", nodeId, path);
347 + notifyRemovedLeader(path, nodeId, epoch);
348 + }
349 + } catch (Exception e) {
350 + log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
351 + }
352 + }
353 + });
354 + } catch (Exception e) {
355 + log.warn("Failed cleaning up stale locks", e);
356 + }
357 + }
358 +
359 + private void sendLeadershipStatus() {
360 + leaderBoard.forEach((path, leadership) -> {
361 + if (leadership.leader().equals(localNodeId)) {
362 + LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
363 + clusterCommunicator.broadcast(
364 + new ClusterMessage(
365 + clusterService.getLocalNode().id(),
366 + LEADERSHIP_EVENT_MESSAGE_SUBJECT,
367 + SERIALIZER.encode(event)));
368 + }
369 + });
370 + }
371 +}
...\ No newline at end of file ...\ No newline at end of file