Madan Jampani
Committed by Gerrit Code Review

[Emu] MutexExecutionService for running tasks that need to run only on a single instance.

Change-Id: Idf9fedbbf15c014e97c77db25aa608cd1db53b27
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.service;
17 +
18 +import java.util.concurrent.CompletableFuture;
19 +import java.util.concurrent.Executor;
20 +
21 +/**
22 + * Service for mutually exclusive job execution.
23 + */
24 +public interface MutexExecutionService {
25 +
26 + /**
27 + * Runs the specified task in a mutually exclusive fashion.
28 + * @param task task to run
29 + * @param exclusionPath path on which different instances synchronize
30 + * @param executor executor to use for running the task
31 + * @return future that is completed when the task execution completes.
32 + */
33 + CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor);
34 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.service;
17 +
18 +/**
19 + * The MutexTask interface should be implemented by any class whose
20 + * instances distributed across controllers are intended to be executed
21 + * in a mutually exclusive fashion.
22 + */
23 +public interface MutexTask {
24 +
25 + /**
26 + * Begins the execution of a mutually exclusive task.
27 + * The start method will be called once the "lock" is acquired.
28 + * After the start method returns the lock is released and some other
29 + * instance can take over execution.
30 + */
31 + void start();
32 +
33 + /**
34 + * This method will be called when exclusivity of task execution
35 + * can no longer be guaranteed. The implementation should take necessary steps
36 + * to halt task execution in order to ensure correctness.
37 + */
38 + void stop();
39 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.consistent.impl;
17 +
18 +import static org.slf4j.LoggerFactory.getLogger;
19 +
20 +import java.util.Arrays;
21 +import java.util.List;
22 +import java.util.Map;
23 +import java.util.concurrent.CompletableFuture;
24 +import java.util.concurrent.Executor;
25 +
26 +import org.apache.felix.scr.annotations.Activate;
27 +import org.apache.felix.scr.annotations.Component;
28 +import org.apache.felix.scr.annotations.Deactivate;
29 +import org.apache.felix.scr.annotations.Reference;
30 +import org.apache.felix.scr.annotations.ReferenceCardinality;
31 +import org.apache.felix.scr.annotations.Service;
32 +import org.onlab.util.Tools;
33 +import org.onosproject.cluster.ClusterEvent;
34 +import org.onosproject.cluster.ClusterEventListener;
35 +import org.onosproject.cluster.ClusterService;
36 +import org.onosproject.cluster.ControllerNode.State;
37 +import org.onosproject.cluster.NodeId;
38 +import org.onosproject.store.serializers.KryoNamespaces;
39 +import org.onosproject.store.service.ConsistentMap;
40 +import org.onosproject.store.service.ConsistentMapException;
41 +import org.onosproject.store.service.MapEvent;
42 +import org.onosproject.store.service.MapEventListener;
43 +import org.onosproject.store.service.MutexExecutionService;
44 +import org.onosproject.store.service.MutexTask;
45 +import org.onosproject.store.service.Serializer;
46 +import org.onosproject.store.service.StorageService;
47 +import org.onosproject.store.service.Versioned;
48 +import org.slf4j.Logger;
49 +
50 +import com.google.common.base.MoreObjects;
51 +import com.google.common.collect.Lists;
52 +import com.google.common.collect.Maps;
53 +
54 +/**
55 + * Implementation of a MutexExecutionService.
56 + */
57 +@Component(immediate = true)
58 +@Service
59 +public class MutexExecutionManager implements MutexExecutionService {
60 +
61 + private final Logger log = getLogger(getClass());
62 +
63 + protected ConsistentMap<String, MutexState> lockMap;
64 + protected NodeId localNodeId;
65 +
66 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 + protected ClusterService clusterService;
68 +
69 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 + protected StorageService storageService;
71 +
72 + private final MapEventListener<String, MutexState> mapEventListener = new InternalLockMapEventListener();
73 + private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
74 +
75 + private Map<String, CompletableFuture<MutexState>> pending = Maps.newConcurrentMap();
76 + private Map<String, InnerMutexTask> activeTasks = Maps.newConcurrentMap();
77 +
78 + @Activate
79 + public void activate() {
80 + localNodeId = clusterService.getLocalNode().id();
81 + lockMap = storageService.<String, MutexState>consistentMapBuilder()
82 + .withName("onos-mutexes")
83 + .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API), MutexState.class))
84 + .withPartitionsDisabled()
85 + .build();
86 + lockMap.addListener(mapEventListener);
87 + clusterService.addListener(clusterEventListener);
88 + releaseOldLocks();
89 + log.info("Started");
90 + }
91 +
92 + @Deactivate
93 + public void deactivate() {
94 + lockMap.removeListener(mapEventListener);
95 + pending.values().forEach(future -> future.cancel(true));
96 + activeTasks.forEach((k, v) -> {
97 + v.stop();
98 + unlock(k);
99 + });
100 + clusterService.removeListener(clusterEventListener);
101 + log.info("Stopped");
102 + }
103 +
104 + @Override
105 + public CompletableFuture<Void> execute(MutexTask task, String exclusionPath, Executor executor) {
106 + return lock(exclusionPath)
107 + .thenApply(state -> activeTasks.computeIfAbsent(exclusionPath,
108 + k -> new InnerMutexTask(exclusionPath,
109 + task,
110 + state.term())))
111 + .thenAcceptAsync(t -> t.start(), executor)
112 + .whenComplete((r, e) -> unlock(exclusionPath));
113 + }
114 +
115 + protected CompletableFuture<MutexState> lock(String exclusionPath) {
116 + CompletableFuture<MutexState> future =
117 + pending.computeIfAbsent(exclusionPath, k -> new CompletableFuture<>());
118 + tryLock(exclusionPath);
119 + return future;
120 + }
121 +
122 + /**
123 + * Attempts to acquire lock for a path. If lock is held by some other node, adds this node to
124 + * the wait list.
125 + * @param exclusionPath exclusion path
126 + */
127 + protected void tryLock(String exclusionPath) {
128 + Tools.retryable(() -> lockMap.asJavaMap()
129 + .compute(exclusionPath,
130 + (k, v) -> MutexState.admit(v, localNodeId)),
131 + ConsistentMapException.ConcurrentModification.class,
132 + Integer.MAX_VALUE,
133 + 100).get();
134 + }
135 +
136 + /**
137 + * Releases lock for the specific path. This operation is idempotent.
138 + * @param exclusionPath exclusion path
139 + */
140 + protected void unlock(String exclusionPath) {
141 + Tools.retryable(() -> lockMap.asJavaMap()
142 + .compute(exclusionPath, (k, v) -> MutexState.evict(v, localNodeId)),
143 + ConsistentMapException.ConcurrentModification.class,
144 + Integer.MAX_VALUE,
145 + 100).get();
146 + }
147 +
148 + /**
149 + * Detects and releases all locks held by this node.
150 + */
151 + private void releaseOldLocks() {
152 + Maps.filterValues(lockMap.asJavaMap(), state -> localNodeId.equals(state.holder()))
153 + .keySet()
154 + .forEach(path -> {
155 + log.info("Detected zombie task still holding lock for {}. Releasing lock.", path);
156 + unlock(path);
157 + });
158 + }
159 +
160 + private class InternalLockMapEventListener implements MapEventListener<String, MutexState> {
161 +
162 + @Override
163 + public void event(MapEvent<String, MutexState> event) {
164 + log.debug("Received {}", event);
165 + if (event.type() == MapEvent.Type.UPDATE || event.type() == MapEvent.Type.INSERT) {
166 + pending.computeIfPresent(event.key(), (k, future) -> {
167 + MutexState state = Versioned.valueOrElse(event.value(), null);
168 + if (state != null && localNodeId.equals(state.holder())) {
169 + log.debug("Local node is now owner for {}", event.key());
170 + future.complete(state);
171 + return null;
172 + } else {
173 + return future;
174 + }
175 + });
176 + InnerMutexTask task = activeTasks.get(event.key());
177 + if (task != null && task.term() < Versioned.valueOrElse(event.value(), null).term()) {
178 + task.stop();
179 + }
180 + }
181 + }
182 + }
183 +
184 + private class InternalClusterEventListener implements ClusterEventListener {
185 +
186 + @Override
187 + public void event(ClusterEvent event) {
188 + if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED ||
189 + event.type() == ClusterEvent.Type.INSTANCE_REMOVED) {
190 + NodeId nodeId = event.subject().id();
191 + log.debug("{} is no longer active. Attemping to clean up its locks.", nodeId);
192 + lockMap.asJavaMap().forEach((k, v) -> {
193 + if (v.contains(nodeId)) {
194 + lockMap.compute(k, (path, state) -> MutexState.evict(v, nodeId));
195 + }
196 + });
197 + }
198 + long activeNodes = clusterService.getNodes()
199 + .stream()
200 + .map(node -> clusterService.getState(node.id()))
201 + .filter(State.ACTIVE::equals)
202 + .count();
203 + if (clusterService.getNodes().size() > 1 && activeNodes == 1) {
204 + log.info("This node is partitioned away from the cluster. Stopping all inflight executions");
205 + activeTasks.forEach((k, v) -> {
206 + v.stop();
207 + });
208 + }
209 + }
210 + }
211 +
212 + private static final class MutexState {
213 +
214 + private final NodeId holder;
215 + private final List<NodeId> waitList;
216 + private final long term;
217 +
218 + public static MutexState admit(MutexState state, NodeId nodeId) {
219 + if (state == null) {
220 + return new MutexState(nodeId, 1L, Lists.newArrayList());
221 + } else if (state.holder() == null) {
222 + return new MutexState(nodeId, state.term() + 1, Lists.newArrayList());
223 + } else {
224 + if (!state.contains(nodeId)) {
225 + NodeId newHolder = state.holder();
226 + List<NodeId> newWaitList = Lists.newArrayList(state.waitList());
227 + newWaitList.add(nodeId);
228 + return new MutexState(newHolder, state.term(), newWaitList);
229 + } else {
230 + return state;
231 + }
232 + }
233 + }
234 +
235 + public static MutexState evict(MutexState state, NodeId nodeId) {
236 + return state.evict(nodeId);
237 + }
238 +
239 + public MutexState evict(NodeId nodeId) {
240 + if (nodeId.equals(holder)) {
241 + if (waitList.isEmpty()) {
242 + return new MutexState(null, term, waitList);
243 + }
244 + List<NodeId> newWaitList = Lists.newArrayList(waitList);
245 + NodeId newHolder = newWaitList.remove(0);
246 + return new MutexState(newHolder, term + 1, newWaitList);
247 + } else {
248 + NodeId newHolder = holder;
249 + List<NodeId> newWaitList = Lists.newArrayList(waitList);
250 + newWaitList.remove(nodeId);
251 + return new MutexState(newHolder, term, newWaitList);
252 + }
253 + }
254 +
255 + public NodeId holder() {
256 + return holder;
257 + }
258 +
259 + public List<NodeId> waitList() {
260 + return waitList;
261 + }
262 +
263 + public long term() {
264 + return term;
265 + }
266 +
267 + private boolean contains(NodeId nodeId) {
268 + return (nodeId.equals(holder) || waitList.contains(nodeId));
269 + }
270 +
271 + private MutexState(NodeId holder, long term, List<NodeId> waitList) {
272 + this.holder = holder;
273 + this.term = term;
274 + this.waitList = Lists.newArrayList(waitList);
275 + }
276 +
277 + @Override
278 + public String toString() {
279 + return MoreObjects.toStringHelper(getClass())
280 + .add("holder", holder)
281 + .add("term", term)
282 + .add("waitList", waitList)
283 + .toString();
284 + }
285 + }
286 +
287 + private class InnerMutexTask implements MutexTask {
288 + private final MutexTask task;
289 + private final String mutexPath;
290 + private final long term;
291 +
292 + public InnerMutexTask(String mutexPath, MutexTask task, long term) {
293 + this.mutexPath = mutexPath;
294 + this.term = term;
295 + this.task = task;
296 + }
297 +
298 + public long term() {
299 + return term;
300 + }
301 +
302 + @Override
303 + public void start() {
304 + log.debug("Starting execution for mutex task guarded by {}", mutexPath);
305 + task.start();
306 + log.debug("Finished execution for mutex task guarded by {}", mutexPath);
307 + }
308 +
309 + @Override
310 + public void stop() {
311 + log.debug("Stopping execution for mutex task guarded by {}", mutexPath);
312 + task.stop();
313 + }
314 + }
315 +}
...\ No newline at end of file ...\ No newline at end of file