Madan Jampani
Committed by Gerrit Code Review

WIP: device mastership store based on leadership service.

Change-Id: I6347718f46b6600f93974825816fb537e39abb44
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.mastership.impl;
17 +
18 +import static org.onlab.util.Tools.groupedThreads;
19 +import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
20 +import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
21 +import static org.slf4j.LoggerFactory.getLogger;
22 +import static com.google.common.base.Preconditions.checkArgument;
23 +
24 +import java.io.IOException;
25 +import java.util.List;
26 +import java.util.Map;
27 +import java.util.Set;
28 +import java.util.concurrent.ExecutionException;
29 +import java.util.concurrent.ExecutorService;
30 +import java.util.concurrent.Executors;
31 +import java.util.concurrent.Future;
32 +import java.util.concurrent.TimeUnit;
33 +import java.util.concurrent.TimeoutException;
34 +import java.util.regex.Matcher;
35 +import java.util.regex.Pattern;
36 +import java.util.stream.Collectors;
37 +
38 +import org.apache.felix.scr.annotations.Activate;
39 +import org.apache.felix.scr.annotations.Component;
40 +import org.apache.felix.scr.annotations.Deactivate;
41 +import org.apache.felix.scr.annotations.Reference;
42 +import org.apache.felix.scr.annotations.ReferenceCardinality;
43 +import org.apache.felix.scr.annotations.Service;
44 +import org.onlab.util.KryoNamespace;
45 +import org.onosproject.cluster.ClusterService;
46 +import org.onosproject.cluster.Leadership;
47 +import org.onosproject.cluster.LeadershipEvent;
48 +import org.onosproject.cluster.LeadershipEventListener;
49 +import org.onosproject.cluster.LeadershipService;
50 +import org.onosproject.cluster.NodeId;
51 +import org.onosproject.cluster.RoleInfo;
52 +import org.onosproject.mastership.MastershipEvent;
53 +import org.onosproject.mastership.MastershipStore;
54 +import org.onosproject.mastership.MastershipStoreDelegate;
55 +import org.onosproject.mastership.MastershipTerm;
56 +import org.onosproject.net.DeviceId;
57 +import org.onosproject.net.MastershipRole;
58 +import org.onosproject.store.AbstractStore;
59 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
60 +import org.onosproject.store.cluster.messaging.ClusterMessage;
61 +import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
62 +import org.onosproject.store.cluster.messaging.MessageSubject;
63 +import org.onosproject.store.serializers.KryoNamespaces;
64 +import org.onosproject.store.serializers.KryoSerializer;
65 +import org.onosproject.store.serializers.StoreSerializer;
66 +import org.slf4j.Logger;
67 +
68 +import com.google.common.base.Objects;
69 +import com.google.common.collect.Lists;
70 +import com.google.common.collect.Maps;
71 +import com.google.common.collect.Sets;
72 +
73 +/**
74 + * Implementation of the MastershipStore on top of Leadership Service.
75 + */
76 +@Component(immediate = true, enabled = false)
77 +@Service
78 +public class ConsistentDeviceMastershipStore
79 + extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
80 + implements MastershipStore {
81 +
82 + private final Logger log = getLogger(getClass());
83 +
84 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 + protected LeadershipService leadershipService;
86 +
87 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 + protected ClusterService clusterService;
89 +
90 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91 + protected ClusterCommunicationService clusterCommunicator;
92 +
93 + private NodeId localNodeId;
94 + private final Set<DeviceId> connectedDevices = Sets.newHashSet();
95 +
96 + private static final MessageSubject ROLE_QUERY_SUBJECT =
97 + new MessageSubject("mastership-store-device-role-query");
98 + private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
99 + new MessageSubject("mastership-store-device-role-relinquish");
100 +
101 + private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
102 + Pattern.compile("/devices/(.*)/mastership");
103 +
104 + private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
105 + private ExecutorService messageHandlingExecutor;
106 + private final LeadershipEventListener leadershipEventListener =
107 + new InternalDeviceMastershipEventListener();
108 +
109 + private static final String NODE_ID_NULL = "Node ID cannot be null";
110 + private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
111 +
112 + public static final StoreSerializer SERIALIZER = new KryoSerializer() {
113 + @Override
114 + protected void setupKryoPool() {
115 + serializerPool = KryoNamespace.newBuilder()
116 + .register(KryoNamespaces.API)
117 + .register(MastershipRole.class)
118 + .register(MastershipEvent.class)
119 + .build();
120 + }
121 + };
122 +
123 + @Activate
124 + public void activate() {
125 + messageHandlingExecutor =
126 + Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
127 + clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
128 + new RoleQueryHandler(),
129 + messageHandlingExecutor);
130 + clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
131 + new RoleRelinquishHandler(),
132 + messageHandlingExecutor);
133 + localNodeId = clusterService.getLocalNode().id();
134 + leadershipService.addListener(leadershipEventListener);
135 +
136 + log.info("Started.");
137 + }
138 +
139 + @Deactivate
140 + public void deactivate() {
141 + clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
142 + clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
143 + messageHandlingExecutor.shutdown();
144 + leadershipService.removeListener(leadershipEventListener);
145 +
146 + log.info("Stoppped.");
147 + }
148 +
149 + @Override
150 + public MastershipRole requestRole(DeviceId deviceId) {
151 + checkArgument(deviceId != null, DEVICE_ID_NULL);
152 +
153 + String leadershipTopic = createDeviceMastershipTopic(deviceId);
154 + if (connectedDevices.add(deviceId)) {
155 + leadershipService.runForLeadership(leadershipTopic);
156 + return MastershipRole.STANDBY;
157 + } else {
158 + Leadership leadership = leadershipService.getLeadership(leadershipTopic);
159 + if (leadership != null && leadership.leader().equals(localNodeId)) {
160 + return MastershipRole.MASTER;
161 + } else {
162 + return MastershipRole.STANDBY;
163 + }
164 + }
165 + }
166 +
167 + @Override
168 + public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
169 + checkArgument(nodeId != null, NODE_ID_NULL);
170 + checkArgument(deviceId != null, DEVICE_ID_NULL);
171 +
172 + String leadershipTopic = createDeviceMastershipTopic(deviceId);
173 + Leadership leadership = leadershipService.getLeadership(leadershipTopic);
174 + if (leadership != null && nodeId.equals(leadership.leader())) {
175 + return MastershipRole.MASTER;
176 + }
177 +
178 + if (localNodeId.equals(nodeId)) {
179 + if (connectedDevices.contains(deviceId)) {
180 + return MastershipRole.STANDBY;
181 + } else {
182 + return MastershipRole.NONE;
183 + }
184 + } else {
185 + try {
186 + MastershipRole role = complete(clusterCommunicator.sendAndReceive(
187 + new ClusterMessage(
188 + localNodeId,
189 + ROLE_QUERY_SUBJECT,
190 + SERIALIZER.encode(deviceId)),
191 + nodeId));
192 + return role == null ? MastershipRole.NONE : role;
193 + } catch (IOException e) {
194 + log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
195 + return MastershipRole.NONE;
196 + }
197 + }
198 + }
199 +
200 + @Override
201 + public NodeId getMaster(DeviceId deviceId) {
202 + checkArgument(deviceId != null, DEVICE_ID_NULL);
203 +
204 + String leadershipTopic = createDeviceMastershipTopic(deviceId);
205 + Leadership leadership = leadershipService.getLeadership(leadershipTopic);
206 + return leadership != null ? leadership.leader() : null;
207 + }
208 +
209 + @Override
210 + public RoleInfo getNodes(DeviceId deviceId) {
211 + checkArgument(deviceId != null, DEVICE_ID_NULL);
212 +
213 + Map<NodeId, MastershipRole> roles = Maps.newHashMap();
214 + clusterService
215 + .getNodes()
216 + .stream()
217 + .parallel()
218 + .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
219 +
220 + NodeId master = null;
221 + final List<NodeId> standbys = Lists.newLinkedList();
222 +
223 + for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
224 + if (entry.getValue() == MastershipRole.MASTER) {
225 + master = entry.getKey();
226 + } else if (entry.getValue() == MastershipRole.STANDBY) {
227 + standbys.add(entry.getKey());
228 + }
229 + }
230 +
231 + return new RoleInfo(master, standbys);
232 + }
233 +
234 + @Override
235 + public Set<DeviceId> getDevices(NodeId nodeId) {
236 + checkArgument(nodeId != null, NODE_ID_NULL);
237 +
238 + return leadershipService
239 + .ownedTopics(nodeId)
240 + .stream()
241 + .filter(this::isDeviceMastershipTopic)
242 + .map(this::extractDeviceIdFromTopic)
243 + .collect(Collectors.toSet());
244 + }
245 +
246 + @Override
247 + public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
248 + checkArgument(nodeId != null, NODE_ID_NULL);
249 + checkArgument(deviceId != null, DEVICE_ID_NULL);
250 +
251 + throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
252 + }
253 +
254 + @Override
255 + public MastershipTerm getTermFor(DeviceId deviceId) {
256 + checkArgument(deviceId != null, DEVICE_ID_NULL);
257 +
258 + String leadershipTopic = createDeviceMastershipTopic(deviceId);
259 + Leadership leadership = leadershipService.getLeadership(leadershipTopic);
260 + return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
261 + }
262 +
263 + @Override
264 + public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
265 + checkArgument(nodeId != null, NODE_ID_NULL);
266 + checkArgument(deviceId != null, DEVICE_ID_NULL);
267 +
268 + throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
269 + }
270 +
271 + @Override
272 + public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
273 + checkArgument(nodeId != null, NODE_ID_NULL);
274 + checkArgument(deviceId != null, DEVICE_ID_NULL);
275 +
276 + if (!nodeId.equals(localNodeId)) {
277 + log.debug("Forwarding request to relinquish "
278 + + "role for device {} to {}", deviceId, nodeId);
279 + try {
280 + return complete(clusterCommunicator.sendAndReceive(
281 + new ClusterMessage(
282 + localNodeId,
283 + ROLE_RELINQUISH_SUBJECT,
284 + SERIALIZER.encode(deviceId)),
285 + nodeId));
286 + } catch (IOException e) {
287 + log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
288 + return null;
289 + }
290 + }
291 +
292 + // Check if this node is can be managed by this node.
293 + if (!connectedDevices.contains(deviceId)) {
294 + return null;
295 + }
296 +
297 + String leadershipTopic = createDeviceMastershipTopic(deviceId);
298 + Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
299 +
300 + MastershipEvent.Type eventType = null;
301 + if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
302 + eventType = MastershipEvent.Type.MASTER_CHANGED;
303 + } else {
304 + eventType = MastershipEvent.Type.BACKUPS_CHANGED;
305 + }
306 +
307 + connectedDevices.remove(deviceId);
308 + leadershipService.withdraw(leadershipTopic);
309 +
310 + return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
311 + }
312 +
313 + private class RoleQueryHandler implements ClusterMessageHandler {
314 + @Override
315 + public void handle(ClusterMessage message) {
316 + DeviceId deviceId = SERIALIZER.decode(message.payload());
317 + try {
318 + message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
319 + } catch (IOException e) {
320 + log.error("Failed to responsd to role query", e);
321 + }
322 + }
323 + }
324 +
325 +
326 + @Override
327 + public void relinquishAllRole(NodeId nodeId) {
328 + // Noop. LeadershipService already takes care of detecting and purging deadlocks.
329 + }
330 +
331 + private class RoleRelinquishHandler implements ClusterMessageHandler {
332 + @Override
333 + public void handle(ClusterMessage message) {
334 + DeviceId deviceId = SERIALIZER.decode(message.payload());
335 + try {
336 + message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
337 + } catch (IOException e) {
338 + log.error("Failed to relinquish role.", e);
339 + }
340 + }
341 + }
342 +
343 + private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
344 + @Override
345 + public void event(LeadershipEvent event) {
346 + Leadership leadership = event.subject();
347 + if (!isDeviceMastershipTopic(leadership.topic())) {
348 + return;
349 + }
350 + NodeId nodeId = leadership.leader();
351 + DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
352 + if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
353 + switch (event.type()) {
354 + case LEADER_ELECTED:
355 + notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
356 + break;
357 + case LEADER_REELECTED:
358 + // There is no concept of leader re-election in the new distributed leadership manager.
359 + throw new IllegalStateException("Unexpected event type");
360 + case LEADER_BOOTED:
361 + notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
362 + break;
363 + default:
364 + return;
365 + }
366 + }
367 + }
368 + }
369 +
370 + private String createDeviceMastershipTopic(DeviceId deviceId) {
371 + return "/devices/" + deviceId.toString() + "/mastership";
372 + }
373 +
374 + private DeviceId extractDeviceIdFromTopic(String topic) {
375 + Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
376 + if (m.matches()) {
377 + return DeviceId.deviceId(m.group(1));
378 + } else {
379 + throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
380 + }
381 + }
382 +
383 + private boolean isDeviceMastershipTopic(String topic) {
384 + Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
385 + return m.matches();
386 + }
387 +
388 + private <T> T complete(Future<byte[]> future) {
389 + try {
390 + return SERIALIZER.decode(future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
391 + } catch (InterruptedException e) {
392 + Thread.currentThread().interrupt();
393 + log.error("Interrupted while waiting for operation to complete.", e);
394 + return null;
395 + } catch (TimeoutException | ExecutionException e) {
396 + log.error("Failed remote operation", e);
397 + return null;
398 + }
399 + }
400 +}
...\ No newline at end of file ...\ No newline at end of file