Madan Jampani
Committed by Gerrit Code Review

ECLinkStore: LinkStore based on EventuallyConsistentMap (disabled right now)

Change-Id: Ib271ad6da90eb8b4d39db160e13c84b7bb695c9b
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.link.impl;
17 +
18 +import static com.google.common.base.Preconditions.checkNotNull;
19 +import static org.onosproject.net.DefaultAnnotations.merge;
20 +import static org.onosproject.net.DefaultAnnotations.union;
21 +import static org.onosproject.net.Link.State.ACTIVE;
22 +import static org.onosproject.net.Link.State.INACTIVE;
23 +import static org.onosproject.net.Link.Type.DIRECT;
24 +import static org.onosproject.net.Link.Type.INDIRECT;
25 +import static org.onosproject.net.LinkKey.linkKey;
26 +import static org.onosproject.net.link.LinkEvent.Type.LINK_ADDED;
27 +import static org.onosproject.net.link.LinkEvent.Type.LINK_REMOVED;
28 +import static org.onosproject.net.link.LinkEvent.Type.LINK_UPDATED;
29 +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
30 +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
31 +import static org.slf4j.LoggerFactory.getLogger;
32 +
33 +import java.util.Collection;
34 +import java.util.Map;
35 +import java.util.Objects;
36 +import java.util.Set;
37 +import java.util.concurrent.atomic.AtomicReference;
38 +import java.util.function.Predicate;
39 +import java.util.stream.Collectors;
40 +
41 +import org.apache.felix.scr.annotations.Activate;
42 +import org.apache.felix.scr.annotations.Component;
43 +import org.apache.felix.scr.annotations.Deactivate;
44 +import org.apache.felix.scr.annotations.Reference;
45 +import org.apache.felix.scr.annotations.ReferenceCardinality;
46 +import org.apache.felix.scr.annotations.Service;
47 +import org.onlab.util.KryoNamespace;
48 +import org.onlab.util.SharedExecutors;
49 +import org.onosproject.cluster.ClusterService;
50 +import org.onosproject.cluster.NodeId;
51 +import org.onosproject.mastership.MastershipService;
52 +import org.onosproject.net.AnnotationKeys;
53 +import org.onosproject.net.AnnotationsUtil;
54 +import org.onosproject.net.ConnectPoint;
55 +import org.onosproject.net.DefaultAnnotations;
56 +import org.onosproject.net.DefaultLink;
57 +import org.onosproject.net.DeviceId;
58 +import org.onosproject.net.Link;
59 +import org.onosproject.net.LinkKey;
60 +import org.onosproject.net.Link.Type;
61 +import org.onosproject.net.device.DeviceClockService;
62 +import org.onosproject.net.link.DefaultLinkDescription;
63 +import org.onosproject.net.link.LinkDescription;
64 +import org.onosproject.net.link.LinkEvent;
65 +import org.onosproject.net.link.LinkStore;
66 +import org.onosproject.net.link.LinkStoreDelegate;
67 +import org.onosproject.net.provider.ProviderId;
68 +import org.onosproject.store.AbstractStore;
69 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
70 +import org.onosproject.store.cluster.messaging.MessageSubject;
71 +import org.onosproject.store.impl.MastershipBasedTimestamp;
72 +import org.onosproject.store.serializers.KryoNamespaces;
73 +import org.onosproject.store.serializers.KryoSerializer;
74 +import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75 +import org.onosproject.store.service.EventuallyConsistentMap;
76 +import org.onosproject.store.service.EventuallyConsistentMapEvent;
77 +import org.onosproject.store.service.EventuallyConsistentMapListener;
78 +import org.onosproject.store.service.StorageService;
79 +import org.slf4j.Logger;
80 +
81 +import com.google.common.collect.Iterables;
82 +import com.google.common.collect.Maps;
83 +import com.google.common.util.concurrent.Futures;
84 +
85 +/**
86 + * Manages the inventory of links using a {@code EventuallyConsistentMap}.
87 + */
88 +@Component(immediate = true, enabled = false)
89 +@Service
90 +public class ECLinkStore
91 + extends AbstractStore<LinkEvent, LinkStoreDelegate>
92 + implements LinkStore {
93 +
94 + private final Logger log = getLogger(getClass());
95 +
96 + private final Map<LinkKey, Link> links = Maps.newConcurrentMap();
97 + private EventuallyConsistentMap<Provided<LinkKey>, LinkDescription> linkDescriptions;
98 +
99 + private static final MessageSubject LINK_INJECT_MESSAGE = new MessageSubject("inject-link-request");
100 +
101 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102 + protected StorageService storageService;
103 +
104 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 + protected MastershipService mastershipService;
106 +
107 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
108 + protected DeviceClockService deviceClockService;
109 +
110 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
111 + protected ClusterCommunicationService clusterCommunicator;
112 +
113 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
114 + protected ClusterService clusterService;
115 +
116 + private EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> linkTracker =
117 + new InternalLinkTracker();
118 +
119 + protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
120 + @Override
121 + protected void setupKryoPool() {
122 + serializerPool = KryoNamespace.newBuilder()
123 + .register(DistributedStoreSerializers.STORE_COMMON)
124 + .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
125 + .register(Provided.class)
126 + .build();
127 + }
128 + };
129 +
130 + @Activate
131 + public void activate() {
132 + KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
133 + .register(KryoNamespaces.API)
134 + .register(MastershipBasedTimestamp.class)
135 + .register(Provided.class);
136 +
137 + linkDescriptions = storageService.<Provided<LinkKey>, LinkDescription>eventuallyConsistentMapBuilder()
138 + .withName("onos-link-descriptions")
139 + .withSerializer(serializer)
140 + .withTimestampProvider((k, v) -> {
141 + try {
142 + return v == null ? null : deviceClockService.getTimestamp(v.dst().deviceId());
143 + } catch (IllegalStateException e) {
144 + return null;
145 + }
146 + }).build();
147 +
148 + clusterCommunicator.addSubscriber(LINK_INJECT_MESSAGE,
149 + SERIALIZER::decode,
150 + this::injectLink,
151 + SharedExecutors.getPoolThreadExecutor());
152 +
153 + linkDescriptions.addListener(linkTracker);
154 +
155 + log.info("Started");
156 + }
157 +
158 + @Deactivate
159 + public void deactivate() {
160 + linkDescriptions.removeListener(linkTracker);
161 + linkDescriptions.destroy();
162 + links.clear();
163 + clusterCommunicator.removeSubscriber(LINK_INJECT_MESSAGE);
164 +
165 + log.info("Stopped");
166 + }
167 +
168 + @Override
169 + public int getLinkCount() {
170 + return links.size();
171 + }
172 +
173 + @Override
174 + public Iterable<Link> getLinks() {
175 + return links.values();
176 + }
177 +
178 + @Override
179 + public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
180 + return filter(links.values(), link -> deviceId.equals(link.src().deviceId()));
181 + }
182 +
183 + @Override
184 + public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
185 + return filter(links.values(), link -> deviceId.equals(link.dst().deviceId()));
186 + }
187 +
188 + @Override
189 + public Link getLink(ConnectPoint src, ConnectPoint dst) {
190 + return links.get(linkKey(src, dst));
191 + }
192 +
193 + @Override
194 + public Set<Link> getEgressLinks(ConnectPoint src) {
195 + return filter(links.values(), link -> src.equals(link.src()));
196 + }
197 +
198 + @Override
199 + public Set<Link> getIngressLinks(ConnectPoint dst) {
200 + return filter(links.values(), link -> dst.equals(link.dst()));
201 + }
202 +
203 + @Override
204 + public LinkEvent createOrUpdateLink(ProviderId providerId,
205 + LinkDescription linkDescription) {
206 + final DeviceId dstDeviceId = linkDescription.dst().deviceId();
207 + final NodeId dstNodeId = mastershipService.getMasterFor(dstDeviceId);
208 +
209 + // Process link update only if we're the master of the destination node,
210 + // otherwise signal the actual master.
211 + if (clusterService.getLocalNode().id().equals(dstNodeId)) {
212 + LinkKey linkKey = linkKey(linkDescription.src(), linkDescription.dst());
213 + Provided<LinkKey> internalLinkKey = new Provided<>(linkKey, providerId);
214 + linkDescriptions.compute(internalLinkKey, (k, v) -> createOrUpdateLinkInternal(v , linkDescription));
215 + return refreshLinkCache(linkKey);
216 + } else {
217 + if (dstNodeId == null) {
218 + return null;
219 + }
220 + return Futures.getUnchecked(clusterCommunicator.sendAndReceive(new Provided<>(linkDescription, providerId),
221 + LINK_INJECT_MESSAGE,
222 + SERIALIZER::encode,
223 + SERIALIZER::decode,
224 + dstNodeId));
225 + }
226 + }
227 +
228 + private LinkDescription createOrUpdateLinkInternal(LinkDescription current, LinkDescription updated) {
229 + if (current != null) {
230 + // we only allow transition from INDIRECT -> DIRECT
231 + return new DefaultLinkDescription(
232 + current.src(),
233 + current.dst(),
234 + current.type() == DIRECT ? DIRECT : updated.type(),
235 + union(current.annotations(), updated.annotations()));
236 + }
237 + return updated;
238 + }
239 +
240 + private LinkEvent refreshLinkCache(LinkKey linkKey) {
241 + AtomicReference<LinkEvent.Type> eventType = new AtomicReference<>();
242 + Link link = links.compute(linkKey, (key, existingLink) -> {
243 + Link newLink = composeLink(linkKey);
244 + if (existingLink == null) {
245 + eventType.set(LINK_ADDED);
246 + return newLink;
247 + } else if (existingLink.state() != newLink.state() ||
248 + (existingLink.type() == INDIRECT && newLink.type() == DIRECT) ||
249 + !AnnotationsUtil.isEqual(existingLink.annotations(), newLink.annotations())) {
250 + eventType.set(LINK_UPDATED);
251 + return newLink;
252 + } else {
253 + return existingLink;
254 + }
255 + });
256 + return eventType.get() != null ? new LinkEvent(eventType.get(), link) : null;
257 + }
258 +
259 + private Set<ProviderId> getAllProviders(LinkKey linkKey) {
260 + return linkDescriptions.keySet()
261 + .stream()
262 + .filter(key -> key.key().equals(linkKey))
263 + .map(key -> key.providerId())
264 + .collect(Collectors.toSet());
265 + }
266 +
267 + private ProviderId getBaseProviderId(LinkKey linkKey) {
268 + Set<ProviderId> allProviders = getAllProviders(linkKey);
269 + if (allProviders.size() > 0) {
270 + return allProviders.stream()
271 + .filter(p -> !p.isAncillary())
272 + .findFirst()
273 + .orElse(Iterables.getFirst(allProviders, null));
274 + }
275 + return null;
276 + }
277 +
278 + private Link composeLink(LinkKey linkKey) {
279 +
280 + ProviderId baseProviderId = checkNotNull(getBaseProviderId(linkKey));
281 + LinkDescription base = linkDescriptions.get(new Provided<>(linkKey, baseProviderId));
282 +
283 + ConnectPoint src = base.src();
284 + ConnectPoint dst = base.dst();
285 + Type type = base.type();
286 + AtomicReference<DefaultAnnotations> annotations = new AtomicReference<>(DefaultAnnotations.builder().build());
287 + annotations.set(merge(annotations.get(), base.annotations()));
288 +
289 + getAllProviders(linkKey).stream()
290 + .map(p -> new Provided<>(linkKey, p))
291 + .forEach(key -> {
292 + annotations.set(merge(annotations.get(),
293 + linkDescriptions.get(key).annotations()));
294 + });
295 +
296 + boolean isDurable = Objects.equals(annotations.get().value(AnnotationKeys.DURABLE), "true");
297 + return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations.get());
298 + }
299 +
300 + // Updates, if necessary the specified link and returns the appropriate event.
301 + // Guarded by linkDescs value (=locking each Link)
302 + private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
303 + // Note: INDIRECT -> DIRECT transition only
304 + // so that BDDP discovered Link will not overwrite LDDP Link
305 + if (oldLink.state() != newLink.state() ||
306 + (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
307 + !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
308 +
309 + links.put(key, newLink);
310 + return new LinkEvent(LINK_UPDATED, newLink);
311 + }
312 + return null;
313 + }
314 +
315 + @Override
316 + public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
317 + Link link = getLink(src, dst);
318 + if (link == null) {
319 + return null;
320 + }
321 +
322 + if (link.isDurable()) {
323 + // FIXME: this will not sync link state!!!
324 + return link.state() == INACTIVE ? null :
325 + updateLink(linkKey(link.src(), link.dst()), link,
326 + new DefaultLink(link.providerId(),
327 + link.src(), link.dst(),
328 + link.type(), INACTIVE,
329 + link.isDurable(),
330 + link.annotations()));
331 + }
332 + return removeLink(src, dst);
333 + }
334 +
335 + @Override
336 + public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
337 + final LinkKey linkKey = LinkKey.linkKey(src, dst);
338 + LinkDescription removedLinkDescription =
339 + linkDescriptions.remove(new Provided<>(linkKey, checkNotNull(getBaseProviderId(linkKey))));
340 + if (removedLinkDescription != null) {
341 + return purgeLinkCache(linkKey);
342 + }
343 + return null;
344 + }
345 +
346 + private LinkEvent purgeLinkCache(LinkKey linkKey) {
347 + Link removedLink = links.remove(linkKey);
348 + if (removedLink != null) {
349 + getAllProviders(linkKey).forEach(p -> linkDescriptions.remove(new Provided<>(linkKey, p)));
350 + return new LinkEvent(LINK_REMOVED, removedLink);
351 + }
352 + return null;
353 + }
354 +
355 + private Set<Link> filter(Collection<Link> links, Predicate<Link> predicate) {
356 + return links.stream().filter(predicate).collect(Collectors.toSet());
357 + }
358 +
359 + private LinkEvent injectLink(Provided<LinkDescription> linkInjectRequest) {
360 + log.trace("Received request to inject link {}", linkInjectRequest);
361 +
362 + ProviderId providerId = linkInjectRequest.providerId();
363 + LinkDescription linkDescription = linkInjectRequest.key();
364 +
365 + final DeviceId deviceId = linkDescription.dst().deviceId();
366 + if (!deviceClockService.isTimestampAvailable(deviceId)) {
367 + // workaround for ONOS-1208
368 + log.warn("Not ready to accept update. Dropping {}", linkInjectRequest);
369 + return null;
370 + }
371 + return createOrUpdateLink(providerId, linkDescription);
372 + }
373 +
374 + private class InternalLinkTracker implements EventuallyConsistentMapListener<Provided<LinkKey>, LinkDescription> {
375 + @Override
376 + public void event(EventuallyConsistentMapEvent<Provided<LinkKey>, LinkDescription> event) {
377 + if (event.type() == PUT) {
378 + notifyDelegate(refreshLinkCache(event.key().key()));
379 + } else if (event.type() == REMOVE) {
380 + notifyDelegate(purgeLinkCache(event.key().key()));
381 + }
382 + }
383 + }
384 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.link.impl;
17 +
18 +import java.util.Objects;
19 +
20 +import org.onosproject.net.provider.ProviderId;
21 +
22 +import com.google.common.base.MoreObjects;
23 +
24 +/**
25 + * Encapsulation of a provider supplied key.
26 + *
27 + * @param <K> key
28 + */
29 +public class Provided<K> {
30 + private final K key;
31 + private final ProviderId providerId;
32 +
33 + public Provided(K key, ProviderId providerId) {
34 + this.key = key;
35 + this.providerId = providerId;
36 + }
37 +
38 + public ProviderId providerId() {
39 + return providerId;
40 + }
41 +
42 + public K key() {
43 + return key;
44 + }
45 +
46 + @Override
47 + public int hashCode() {
48 + return Objects.hash(key, providerId);
49 + }
50 +
51 + @Override
52 + public boolean equals(Object other) {
53 + if (other instanceof Provided) {
54 + Provided<K> that = (Provided) other;
55 + return Objects.equals(key, that.key) &&
56 + Objects.equals(providerId, that.providerId);
57 + }
58 + return false;
59 + }
60 +
61 + @Override
62 + public String toString() {
63 + return MoreObjects.toStringHelper(getClass())
64 + .add("key", key)
65 + .add("providerId", providerId)
66 + .toString();
67 + }
68 +}