Madan Jampani

ECDeviceStore: DeviceStore built using ONOS distributed primitives: ECMap and Di…

…stributedSet (disabled right now)

Change-Id: I36fdcd635f982f2b8dac291c52be4662601ef9f0
1 +/*
2 + * Copyright 2014-2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.device.impl;
17 +
18 +import java.util.Objects;
19 +
20 +import org.onosproject.net.DeviceId;
21 +import org.onosproject.net.provider.ProviderId;
22 +
23 +import com.google.common.base.MoreObjects;
24 +
25 +/**
26 + * Key for DeviceDescriptions in ECDeviceStore.
27 + */
28 +public class DeviceKey {
29 + private final ProviderId providerId;
30 + private final DeviceId deviceId;
31 +
32 + public DeviceKey(ProviderId providerId, DeviceId deviceId) {
33 + this.providerId = providerId;
34 + this.deviceId = deviceId;
35 + }
36 +
37 + public ProviderId providerId() {
38 + return providerId;
39 + }
40 +
41 + public DeviceId deviceId() {
42 + return deviceId;
43 + }
44 +
45 + @Override
46 + public int hashCode() {
47 + return Objects.hash(providerId, deviceId);
48 + }
49 +
50 + @Override
51 + public boolean equals(Object obj) {
52 + if (this == obj) {
53 + return true;
54 + }
55 + if (!(obj instanceof DeviceKey)) {
56 + return false;
57 + }
58 + DeviceKey that = (DeviceKey) obj;
59 + return Objects.equals(this.deviceId, that.deviceId) &&
60 + Objects.equals(this.providerId, that.providerId);
61 + }
62 +
63 + @Override
64 + public String toString() {
65 + return MoreObjects.toStringHelper(getClass())
66 + .add("providerId", providerId)
67 + .add("deviceId", deviceId)
68 + .toString();
69 + }
70 +}
1 +/*
2 + * Copyright 2014-2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.device.impl;
17 +
18 +import static com.google.common.base.Preconditions.checkArgument;
19 +import static com.google.common.base.Verify.verify;
20 +import static org.onosproject.net.DefaultAnnotations.merge;
21 +import static org.slf4j.LoggerFactory.getLogger;
22 +
23 +import java.util.Collection;
24 +import java.util.Collections;
25 +import java.util.List;
26 +import java.util.Map;
27 +import java.util.Objects;
28 +import java.util.Optional;
29 +import java.util.Set;
30 +import java.util.Map.Entry;
31 +import java.util.concurrent.TimeUnit;
32 +import java.util.concurrent.atomic.AtomicReference;
33 +import java.util.stream.Collectors;
34 +
35 +import org.apache.felix.scr.annotations.Activate;
36 +import org.apache.felix.scr.annotations.Component;
37 +import org.apache.felix.scr.annotations.Deactivate;
38 +import org.apache.felix.scr.annotations.Reference;
39 +import org.apache.felix.scr.annotations.ReferenceCardinality;
40 +import org.apache.felix.scr.annotations.Service;
41 +import org.onlab.packet.ChassisId;
42 +import org.onlab.util.KryoNamespace;
43 +import org.onlab.util.SharedExecutors;
44 +import org.onosproject.cluster.ClusterService;
45 +import org.onosproject.cluster.NodeId;
46 +import org.onosproject.mastership.MastershipService;
47 +import org.onosproject.mastership.MastershipTermService;
48 +import org.onosproject.net.Annotations;
49 +import org.onosproject.net.AnnotationsUtil;
50 +import org.onosproject.net.DefaultAnnotations;
51 +import org.onosproject.net.DefaultDevice;
52 +import org.onosproject.net.DefaultPort;
53 +import org.onosproject.net.Device;
54 +import org.onosproject.net.DeviceId;
55 +import org.onosproject.net.MastershipRole;
56 +import org.onosproject.net.OchPort;
57 +import org.onosproject.net.OduCltPort;
58 +import org.onosproject.net.OmsPort;
59 +import org.onosproject.net.Port;
60 +import org.onosproject.net.PortNumber;
61 +import org.onosproject.net.Device.Type;
62 +import org.onosproject.net.device.DefaultPortStatistics;
63 +import org.onosproject.net.device.DeviceClockService;
64 +import org.onosproject.net.device.DeviceDescription;
65 +import org.onosproject.net.device.DeviceEvent;
66 +import org.onosproject.net.device.DeviceStore;
67 +import org.onosproject.net.device.DeviceStoreDelegate;
68 +import org.onosproject.net.device.OchPortDescription;
69 +import org.onosproject.net.device.OduCltPortDescription;
70 +import org.onosproject.net.device.OmsPortDescription;
71 +import org.onosproject.net.device.PortDescription;
72 +import org.onosproject.net.device.PortStatistics;
73 +import org.onosproject.net.provider.ProviderId;
74 +import org.onosproject.store.AbstractStore;
75 +import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
76 +import org.onosproject.store.impl.MastershipBasedTimestamp;
77 +import org.onosproject.store.serializers.KryoNamespaces;
78 +import org.onosproject.store.serializers.KryoSerializer;
79 +import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
80 +import org.onosproject.store.service.DistributedSet;
81 +import org.onosproject.store.service.EventuallyConsistentMap;
82 +import org.onosproject.store.service.EventuallyConsistentMapEvent;
83 +import org.onosproject.store.service.Serializer;
84 +import org.onosproject.store.service.SetEvent;
85 +import org.onosproject.store.service.SetEventListener;
86 +import org.onosproject.store.service.WallClockTimestamp;
87 +
88 +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
89 +import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
90 +
91 +import org.onosproject.store.service.EventuallyConsistentMapListener;
92 +import org.onosproject.store.service.StorageService;
93 +import org.slf4j.Logger;
94 +
95 +import static org.onosproject.net.device.DeviceEvent.Type.*;
96 +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_INJECTED;
97 +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.DEVICE_REMOVE_REQ;
98 +import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.PORT_INJECTED;
99 +
100 +import com.google.common.collect.ImmutableList;
101 +import com.google.common.collect.Iterables;
102 +import com.google.common.collect.Lists;
103 +import com.google.common.collect.Maps;
104 +import com.google.common.collect.Sets;
105 +import com.google.common.util.concurrent.Futures;
106 +
107 +/**
108 + * Manages the inventory of devices using a {@code EventuallyConsistentMap}.
109 + */
110 +@Component(immediate = true, enabled = false)
111 +@Service
112 +public class ECDeviceStore
113 + extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
114 + implements DeviceStore {
115 +
116 + private final Logger log = getLogger(getClass());
117 +
118 + private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
119 +
120 + private final Map<DeviceId, Device> devices = Maps.newConcurrentMap();
121 + private final Map<DeviceId, Map<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
122 + Set<DeviceId> pendingAvailableChangeUpdates = Sets.newConcurrentHashSet();
123 +
124 + private EventuallyConsistentMap<DeviceKey, DeviceDescription> deviceDescriptions;
125 + private EventuallyConsistentMap<PortKey, PortDescription> portDescriptions;
126 + private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
127 + private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
128 +
129 + private DistributedSet<DeviceId> availableDevices;
130 +
131 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
132 + protected StorageService storageService;
133 +
134 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
135 + protected MastershipService mastershipService;
136 +
137 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
138 + protected MastershipTermService mastershipTermService;
139 +
140 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
141 + protected DeviceClockService deviceClockService;
142 +
143 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
144 + protected ClusterCommunicationService clusterCommunicator;
145 +
146 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
147 + protected ClusterService clusterService;
148 +
149 + private NodeId localNodeId;
150 + private EventuallyConsistentMapListener<DeviceKey, DeviceDescription> deviceUpdateListener =
151 + new InternalDeviceChangeEventListener();
152 + private EventuallyConsistentMapListener<PortKey, PortDescription> portUpdateListener =
153 + new InternalPortChangeEventListener();
154 + private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> portStatsListener =
155 + new InternalPortStatsListener();
156 + private final SetEventListener<DeviceId> deviceStatusTracker =
157 + new InternalDeviceStatusTracker();
158 +
159 + protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
160 + @Override
161 + protected void setupKryoPool() {
162 + serializerPool = KryoNamespace.newBuilder()
163 + .register(DistributedStoreSerializers.STORE_COMMON)
164 + .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
165 + .register(DeviceInjectedEvent.class)
166 + .register(PortInjectedEvent.class)
167 + .build();
168 + }
169 + };
170 +
171 + protected static final KryoNamespace.Builder SERIALIZER_BUILDER = KryoNamespace.newBuilder()
172 + .register(KryoNamespaces.API)
173 + .register(DeviceKey.class)
174 + .register(PortKey.class)
175 + .register(DeviceKey.class)
176 + .register(PortKey.class)
177 + .register(MastershipBasedTimestamp.class);
178 +
179 + @Activate
180 + public void activate() {
181 + localNodeId = clusterService.getLocalNode().id();
182 +
183 + deviceDescriptions = storageService.<DeviceKey, DeviceDescription>eventuallyConsistentMapBuilder()
184 + .withName("onos-device-descriptions")
185 + .withSerializer(SERIALIZER_BUILDER)
186 + .withTimestampProvider((k, v) -> {
187 + try {
188 + return deviceClockService.getTimestamp(k.deviceId());
189 + } catch (IllegalStateException e) {
190 + return null;
191 + }
192 + }).build();
193 +
194 + portDescriptions = storageService.<PortKey, PortDescription>eventuallyConsistentMapBuilder()
195 + .withName("onos-port-descriptions")
196 + .withSerializer(SERIALIZER_BUILDER)
197 + .withTimestampProvider((k, v) -> {
198 + try {
199 + return deviceClockService.getTimestamp(k.deviceId());
200 + } catch (IllegalStateException e) {
201 + return null;
202 + }
203 + }).build();
204 +
205 + devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
206 + .withName("onos-port-stats")
207 + .withSerializer(SERIALIZER_BUILDER)
208 + .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
209 + .withTimestampProvider((k, v) -> new WallClockTimestamp())
210 + .withTombstonesDisabled()
211 + .build();
212 +
213 + devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
214 + eventuallyConsistentMapBuilder()
215 + .withName("onos-port-stats-delta")
216 + .withSerializer(SERIALIZER_BUILDER)
217 + .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
218 + .withTimestampProvider((k, v) -> new WallClockTimestamp())
219 + .withTombstonesDisabled()
220 + .build();
221 +
222 + clusterCommunicator.addSubscriber(DEVICE_INJECTED,
223 + SERIALIZER::decode,
224 + this::injectDevice,
225 + SERIALIZER::encode,
226 + SharedExecutors.getPoolThreadExecutor());
227 +
228 + clusterCommunicator.addSubscriber(PORT_INJECTED,
229 + SERIALIZER::decode,
230 + this::injectPort,
231 + SERIALIZER::encode,
232 + SharedExecutors.getPoolThreadExecutor());
233 +
234 + availableDevices = storageService.<DeviceId>setBuilder()
235 + .withName("onos-online-devices")
236 + .withSerializer(Serializer.using(KryoNamespaces.API))
237 + .withPartitionsDisabled()
238 + .withRelaxedReadConsistency()
239 + .build();
240 +
241 + deviceDescriptions.addListener(deviceUpdateListener);
242 + portDescriptions.addListener(portUpdateListener);
243 + devicePortStats.addListener(portStatsListener);
244 + availableDevices.addListener(deviceStatusTracker);
245 + log.info("Started");
246 + }
247 +
248 + @Deactivate
249 + public void deactivate() {
250 + devicePortStats.removeListener(portStatsListener);
251 + deviceDescriptions.removeListener(deviceUpdateListener);
252 + portDescriptions.removeListener(portUpdateListener);
253 + availableDevices.removeListener(deviceStatusTracker);
254 + devicePortStats.destroy();
255 + devicePortDeltaStats.destroy();
256 + deviceDescriptions.destroy();
257 + portDescriptions.destroy();
258 + devices.clear();
259 + devicePorts.clear();
260 + clusterCommunicator.removeSubscriber(DEVICE_INJECTED);
261 + clusterCommunicator.removeSubscriber(PORT_INJECTED);
262 + log.info("Stopped");
263 + }
264 +
265 + @Override
266 + public Iterable<Device> getDevices() {
267 + return devices.values();
268 + }
269 +
270 + @Override
271 + public int getDeviceCount() {
272 + return devices.size();
273 + }
274 +
275 + @Override
276 + public Device getDevice(DeviceId deviceId) {
277 + return devices.get(deviceId);
278 + }
279 +
280 + @Override
281 + public DeviceEvent createOrUpdateDevice(ProviderId providerId,
282 + DeviceId deviceId,
283 + DeviceDescription deviceDescription) {
284 + NodeId master = mastershipService.getMasterFor(deviceId);
285 + if (localNodeId.equals(master)) {
286 + deviceDescriptions.put(new DeviceKey(providerId, deviceId), deviceDescription);
287 + return refreshDeviceCache(providerId, deviceId);
288 + } else {
289 + DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(providerId, deviceId, deviceDescription);
290 + return Futures.getUnchecked(
291 + clusterCommunicator.sendAndReceive(deviceInjectedEvent,
292 + DEVICE_INJECTED,
293 + SERIALIZER::encode,
294 + SERIALIZER::decode,
295 + master));
296 + }
297 + }
298 +
299 + private DeviceEvent refreshDeviceCache(ProviderId providerId, DeviceId deviceId) {
300 + AtomicReference<DeviceEvent.Type> eventType = new AtomicReference<>();
301 + Device device = devices.compute(deviceId, (k, existingDevice) -> {
302 + Device newDevice = composeDevice(deviceId);
303 + if (existingDevice == null) {
304 + eventType.set(DEVICE_ADDED);
305 + } else {
306 + // We allow only certain attributes to trigger update
307 + boolean propertiesChanged =
308 + !Objects.equals(existingDevice.hwVersion(), newDevice.hwVersion()) ||
309 + !Objects.equals(existingDevice.swVersion(), newDevice.swVersion()) ||
310 + !Objects.equals(existingDevice.providerId(), newDevice.providerId());
311 + boolean annotationsChanged =
312 + !AnnotationsUtil.isEqual(existingDevice.annotations(), newDevice.annotations());
313 +
314 + // Primary providers can respond to all changes, but ancillary ones
315 + // should respond only to annotation changes.
316 + if ((providerId.isAncillary() && annotationsChanged) ||
317 + (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
318 + boolean replaced = devices.replace(deviceId, existingDevice, newDevice);
319 + verify(replaced, "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
320 + providerId, existingDevice, devices.get(deviceId), newDevice);
321 + eventType.set(DEVICE_UPDATED);
322 + }
323 + }
324 + return newDevice;
325 + });
326 + if (eventType.get() != null && !providerId.isAncillary()) {
327 + markOnline(deviceId);
328 + }
329 + return eventType.get() != null ? new DeviceEvent(eventType.get(), device) : null;
330 + }
331 +
332 + /**
333 + * Returns the primary providerId for a device.
334 + * @param deviceId device identifier
335 + * @return primary providerId
336 + */
337 + private Set<ProviderId> getAllProviders(DeviceId deviceId) {
338 + return deviceDescriptions.keySet()
339 + .stream()
340 + .filter(deviceKey -> deviceKey.deviceId().equals(deviceId))
341 + .map(deviceKey -> deviceKey.providerId())
342 + .collect(Collectors.toSet());
343 + }
344 +
345 + /**
346 + * Returns the identifier for all providers for a device.
347 + * @param deviceId device identifier
348 + * @return set of provider identifiers
349 + */
350 + private ProviderId getPrimaryProviderId(DeviceId deviceId) {
351 + Set<ProviderId> allProviderIds = getAllProviders(deviceId);
352 + return allProviderIds.stream()
353 + .filter(p -> !p.isAncillary())
354 + .findFirst()
355 + .orElse(Iterables.getFirst(allProviderIds, null));
356 + }
357 +
358 + /**
359 + * Returns a Device, merging descriptions from multiple Providers.
360 + *
361 + * @param deviceId device identifier
362 + * @return Device instance
363 + */
364 + private Device composeDevice(DeviceId deviceId) {
365 +
366 + ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
367 + DeviceDescription primaryDeviceDescription =
368 + deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
369 +
370 + Type type = primaryDeviceDescription.type();
371 + String manufacturer = primaryDeviceDescription.manufacturer();
372 + String hwVersion = primaryDeviceDescription.hwVersion();
373 + String swVersion = primaryDeviceDescription.swVersion();
374 + String serialNumber = primaryDeviceDescription.serialNumber();
375 + ChassisId chassisId = primaryDeviceDescription.chassisId();
376 + DefaultAnnotations annotations = mergeAnnotations(deviceId);
377 +
378 + return new DefaultDevice(primaryProviderId, deviceId, type, manufacturer,
379 + hwVersion, swVersion, serialNumber,
380 + chassisId, annotations);
381 + }
382 +
383 + private DeviceEvent purgeDeviceCache(DeviceId deviceId) {
384 + Device removedDevice = devices.remove(deviceId);
385 + if (removedDevice != null) {
386 + getAllProviders(deviceId).forEach(p -> deviceDescriptions.remove(new DeviceKey(p, deviceId)));
387 + return new DeviceEvent(DEVICE_REMOVED, removedDevice);
388 + }
389 + return null;
390 + }
391 +
392 + private boolean markOnline(DeviceId deviceId) {
393 + return availableDevices.add(deviceId);
394 + }
395 +
396 + @Override
397 + public DeviceEvent markOffline(DeviceId deviceId) {
398 + availableDevices.remove(deviceId);
399 + // set update listener will raise the event.
400 + return null;
401 + }
402 +
403 + @Override
404 + public List<DeviceEvent> updatePorts(ProviderId providerId,
405 + DeviceId deviceId,
406 + List<PortDescription> descriptions) {
407 + NodeId master = mastershipService.getMasterFor(deviceId);
408 + List<DeviceEvent> deviceEvents = null;
409 + if (localNodeId.equals(master)) {
410 + descriptions.forEach(description -> {
411 + PortKey portKey = new PortKey(providerId, deviceId, description.portNumber());
412 + portDescriptions.put(portKey, description);
413 + });
414 + deviceEvents = refreshDevicePortCache(providerId, deviceId, Optional.empty());
415 + } else {
416 + if (master == null) {
417 + return Collections.emptyList();
418 + }
419 + PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, descriptions);
420 + deviceEvents = Futures.getUnchecked(
421 + clusterCommunicator.sendAndReceive(portInjectedEvent,
422 + PORT_INJECTED,
423 + SERIALIZER::encode,
424 + SERIALIZER::decode,
425 + master));
426 + }
427 + return deviceEvents == null ? Collections.emptyList() : deviceEvents;
428 + }
429 +
430 + private List<DeviceEvent> refreshDevicePortCache(ProviderId providerId,
431 + DeviceId deviceId,
432 + Optional<PortNumber> portNumber) {
433 + Device device = devices.get(deviceId);
434 + checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
435 + List<DeviceEvent> events = Lists.newArrayList();
436 +
437 + Map<PortNumber, Port> ports = devicePorts.computeIfAbsent(deviceId, key -> Maps.newConcurrentMap());
438 + List<PortDescription> descriptions = Lists.newArrayList();
439 + portDescriptions.entrySet().forEach(e -> {
440 + PortKey key = e.getKey();
441 + PortDescription value = e.getValue();
442 + if (key.deviceId().equals(deviceId) && key.providerId().equals(providerId)) {
443 + if (portNumber.isPresent()) {
444 + if (portNumber.get().equals(key.portNumber())) {
445 + descriptions.add(value);
446 + }
447 + } else {
448 + descriptions.add(value);
449 + }
450 + }
451 + });
452 +
453 + for (PortDescription description : descriptions) {
454 + final PortNumber number = description.portNumber();
455 + ports.compute(number, (k, existingPort) -> {
456 + Port newPort = composePort(device, number);
457 + if (existingPort == null) {
458 + events.add(new DeviceEvent(PORT_ADDED, device, newPort));
459 + } else {
460 + if (existingPort.isEnabled() != newPort.isEnabled() ||
461 + existingPort.type() != newPort.type() ||
462 + existingPort.portSpeed() != newPort.portSpeed() ||
463 + !AnnotationsUtil.isEqual(existingPort.annotations(), newPort.annotations())) {
464 + events.add(new DeviceEvent(PORT_UPDATED, device, newPort));
465 + }
466 + }
467 + return newPort;
468 + });
469 + }
470 +
471 + return events;
472 + }
473 +
474 + /**
475 + * Returns a Port, merging descriptions from multiple Providers.
476 + *
477 + * @param device device the port is on
478 + * @param number port number
479 + * @return Port instance
480 + */
481 + private Port composePort(Device device, PortNumber number) {
482 +
483 + Map<ProviderId, PortDescription> descriptions = Maps.newHashMap();
484 + portDescriptions.entrySet().forEach(entry -> {
485 + PortKey portKey = entry.getKey();
486 + if (portKey.deviceId().equals(device.id()) && portKey.portNumber().equals(number)) {
487 + descriptions.put(portKey.providerId(), entry.getValue());
488 + }
489 + });
490 + ProviderId primary = getPrimaryProviderId(device.id());
491 + PortDescription primaryDescription = descriptions.get(primary);
492 +
493 + // if no primary, assume not enabled
494 + boolean isEnabled = false;
495 + DefaultAnnotations annotations = DefaultAnnotations.builder().build();
496 + if (primaryDescription != null) {
497 + isEnabled = primaryDescription.isEnabled();
498 + annotations = merge(annotations, primaryDescription.annotations());
499 + }
500 + Port updated = null;
501 + for (Entry<ProviderId, PortDescription> e : descriptions.entrySet()) {
502 + if (e.getKey().equals(primary)) {
503 + continue;
504 + }
505 + annotations = merge(annotations, e.getValue().annotations());
506 + updated = buildTypedPort(device, number, isEnabled, e.getValue(), annotations);
507 + }
508 + if (primaryDescription == null) {
509 + return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
510 + }
511 + return updated == null
512 + ? buildTypedPort(device, number, isEnabled, primaryDescription, annotations)
513 + : updated;
514 + }
515 +
516 + private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
517 + PortDescription description, Annotations annotations) {
518 + switch (description.type()) {
519 + case OMS:
520 + OmsPortDescription omsDesc = (OmsPortDescription) description;
521 + return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
522 + omsDesc.maxFrequency(), omsDesc.grid(), annotations);
523 + case OCH:
524 + OchPortDescription ochDesc = (OchPortDescription) description;
525 + return new OchPort(device, number, isEnabled, ochDesc.signalType(),
526 + ochDesc.isTunable(), ochDesc.lambda(), annotations);
527 + case ODUCLT:
528 + OduCltPortDescription oduDesc = (OduCltPortDescription) description;
529 + return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
530 + default:
531 + return new DefaultPort(device, number, isEnabled, description.type(),
532 + description.portSpeed(), annotations);
533 + }
534 + }
535 +
536 + @Override
537 + public DeviceEvent updatePortStatus(ProviderId providerId,
538 + DeviceId deviceId,
539 + PortDescription portDescription) {
540 + portDescriptions.put(new PortKey(providerId, deviceId, portDescription.portNumber()), portDescription);
541 + List<DeviceEvent> events =
542 + refreshDevicePortCache(providerId, deviceId, Optional.of(portDescription.portNumber()));
543 + return Iterables.getFirst(events, null);
544 + }
545 +
546 + @Override
547 + public List<Port> getPorts(DeviceId deviceId) {
548 + return ImmutableList.copyOf(devicePorts.getOrDefault(deviceId, Maps.newHashMap()).values());
549 + }
550 +
551 + @Override
552 + public Port getPort(DeviceId deviceId, PortNumber portNumber) {
553 + return devicePorts.getOrDefault(deviceId, Maps.newHashMap()).get(portNumber);
554 + }
555 +
556 + @Override
557 + public DeviceEvent updatePortStatistics(ProviderId providerId,
558 + DeviceId deviceId,
559 + Collection<PortStatistics> newStatsCollection) {
560 +
561 + Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
562 + Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
563 + Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
564 +
565 + if (prvStatsMap != null) {
566 + for (PortStatistics newStats : newStatsCollection) {
567 + PortNumber port = PortNumber.portNumber(newStats.port());
568 + PortStatistics prvStats = prvStatsMap.get(port);
569 + DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
570 + PortStatistics deltaStats = builder.build();
571 + if (prvStats != null) {
572 + deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
573 + }
574 + deltaStatsMap.put(port, deltaStats);
575 + newStatsMap.put(port, newStats);
576 + }
577 + } else {
578 + for (PortStatistics newStats : newStatsCollection) {
579 + PortNumber port = PortNumber.portNumber(newStats.port());
580 + newStatsMap.put(port, newStats);
581 + }
582 + }
583 + devicePortDeltaStats.put(deviceId, deltaStatsMap);
584 + devicePortStats.put(deviceId, newStatsMap);
585 + // DeviceEvent returns null because of InternalPortStatsListener usage
586 + return null;
587 + }
588 +
589 + /**
590 + * Calculate delta statistics by subtracting previous from new statistics.
591 + *
592 + * @param deviceId
593 + * @param prvStats
594 + * @param newStats
595 + * @return PortStatistics
596 + */
597 + public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
598 + // calculate time difference
599 + long deltaStatsSec, deltaStatsNano;
600 + if (newStats.durationNano() < prvStats.durationNano()) {
601 + deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
602 + deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
603 + } else {
604 + deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
605 + deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
606 + }
607 + DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
608 + DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
609 + .setPort(newStats.port())
610 + .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
611 + .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
612 + .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
613 + .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
614 + .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
615 + .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
616 + .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
617 + .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
618 + .setDurationSec(deltaStatsSec)
619 + .setDurationNano(deltaStatsNano)
620 + .build();
621 + return deltaStats;
622 + }
623 +
624 + @Override
625 + public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
626 + Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
627 + if (portStats == null) {
628 + return Collections.emptyList();
629 + }
630 + return ImmutableList.copyOf(portStats.values());
631 + }
632 +
633 + @Override
634 + public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
635 + Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
636 + if (portStats == null) {
637 + return Collections.emptyList();
638 + }
639 + return ImmutableList.copyOf(portStats.values());
640 + }
641 +
642 + @Override
643 + public boolean isAvailable(DeviceId deviceId) {
644 + return availableDevices.contains(deviceId);
645 + }
646 +
647 + @Override
648 + public Iterable<Device> getAvailableDevices() {
649 + return Iterables.filter(Iterables.transform(availableDevices, devices::get), d -> d != null);
650 + }
651 +
652 + @Override
653 + public DeviceEvent removeDevice(DeviceId deviceId) {
654 + NodeId master = mastershipService.getMasterFor(deviceId);
655 + // if there exist a master, forward
656 + // if there is no master, try to become one and process
657 + boolean relinquishAtEnd = false;
658 + if (master == null) {
659 + final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
660 + if (myRole != MastershipRole.NONE) {
661 + relinquishAtEnd = true;
662 + }
663 + log.debug("Temporarily requesting role for {} to remove", deviceId);
664 + MastershipRole role = Futures.getUnchecked(mastershipService.requestRoleFor(deviceId));
665 + if (role == MastershipRole.MASTER) {
666 + master = localNodeId;
667 + }
668 + }
669 +
670 + if (!localNodeId.equals(master)) {
671 + log.debug("{} has control of {}, forwarding remove request",
672 + master, deviceId);
673 +
674 + clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master)
675 + .whenComplete((r, e) -> {
676 + if (e != null) {
677 + log.error("Failed to forward {} remove request to its master", deviceId, e);
678 + }
679 + });
680 + return null;
681 + }
682 +
683 + // I have control..
684 + DeviceEvent event = null;
685 + final DeviceKey deviceKey = new DeviceKey(getPrimaryProviderId(deviceId), deviceId);
686 + DeviceDescription removedDeviceDescription =
687 + deviceDescriptions.remove(deviceKey);
688 + if (removedDeviceDescription != null) {
689 + event = purgeDeviceCache(deviceId);
690 + }
691 +
692 + if (relinquishAtEnd) {
693 + log.debug("Relinquishing temporary role acquired for {}", deviceId);
694 + mastershipService.relinquishMastership(deviceId);
695 + }
696 + return event;
697 + }
698 +
699 + private DeviceEvent injectDevice(DeviceInjectedEvent event) {
700 + return createOrUpdateDevice(event.providerId(), event.deviceId(), event.deviceDescription());
701 + }
702 +
703 + private List<DeviceEvent> injectPort(PortInjectedEvent event) {
704 + return updatePorts(event.providerId(), event.deviceId(), event.portDescriptions());
705 + }
706 +
707 + private DefaultAnnotations mergeAnnotations(DeviceId deviceId) {
708 + ProviderId primaryProviderId = getPrimaryProviderId(deviceId);
709 + DeviceDescription primaryDeviceDescription =
710 + deviceDescriptions.get(new DeviceKey(primaryProviderId, deviceId));
711 + DefaultAnnotations annotations = DefaultAnnotations.builder().build();
712 + annotations = merge(annotations, primaryDeviceDescription.annotations());
713 + for (ProviderId providerId : getAllProviders(deviceId)) {
714 + if (!providerId.equals(primaryProviderId)) {
715 + annotations = merge(annotations,
716 + deviceDescriptions.get(new DeviceKey(providerId, deviceId)).annotations());
717 + }
718 + }
719 + return annotations;
720 + }
721 +
722 + private class InternalDeviceStatusTracker implements SetEventListener<DeviceId> {
723 + @Override
724 + public void event(SetEvent<DeviceId> event) {
725 + final DeviceId deviceId = event.entry();
726 + final Device device = devices.get(deviceId);
727 + if (device != null) {
728 + notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device));
729 + } else {
730 + pendingAvailableChangeUpdates.add(deviceId);
731 + }
732 + }
733 + }
734 +
735 + private class InternalDeviceChangeEventListener
736 + implements EventuallyConsistentMapListener<DeviceKey, DeviceDescription> {
737 + @Override
738 + public void event(EventuallyConsistentMapEvent<DeviceKey, DeviceDescription> event) {
739 + DeviceId deviceId = event.key().deviceId();
740 + ProviderId providerId = event.key().providerId();
741 + if (event.type() == PUT) {
742 + notifyDelegate(refreshDeviceCache(providerId, deviceId));
743 + if (pendingAvailableChangeUpdates.remove(deviceId)) {
744 + notifyDelegate(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, devices.get(deviceId)));
745 + }
746 + } else if (event.type() == REMOVE) {
747 + notifyDelegate(purgeDeviceCache(deviceId));
748 + }
749 + }
750 + }
751 +
752 + private class InternalPortChangeEventListener
753 + implements EventuallyConsistentMapListener<PortKey, PortDescription> {
754 + @Override
755 + public void event(EventuallyConsistentMapEvent<PortKey, PortDescription> event) {
756 + DeviceId deviceId = event.key().deviceId();
757 + ProviderId providerId = event.key().providerId();
758 + PortNumber portNumber = event.key().portNumber();
759 + if (event.type() == PUT) {
760 + if (devices.containsKey(deviceId)) {
761 + List<DeviceEvent> events = refreshDevicePortCache(providerId, deviceId, Optional.of(portNumber));
762 + for (DeviceEvent deviceEvent : events) {
763 + notifyDelegate(deviceEvent);
764 + }
765 + }
766 + } else if (event.type() == REMOVE) {
767 + log.warn("Unexpected port removed event");
768 + }
769 + }
770 + }
771 +
772 + private class InternalPortStatsListener
773 + implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
774 + @Override
775 + public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
776 + if (event.type() == PUT) {
777 + Device device = devices.get(event.key());
778 + if (device != null) {
779 + delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
780 + }
781 + }
782 + }
783 + }
784 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2014-2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.device.impl;
17 +
18 +import java.util.Objects;
19 +
20 +import org.onosproject.net.DeviceId;
21 +import org.onosproject.net.PortNumber;
22 +import org.onosproject.net.provider.ProviderId;
23 +
24 +import com.google.common.base.MoreObjects;
25 +
26 +/**
27 + * Key for PortDescriptions in ECDeviceStore.
28 + */
29 +public class PortKey {
30 + private final ProviderId providerId;
31 + private final DeviceId deviceId;
32 + private final PortNumber portNumber;
33 +
34 + public PortKey(ProviderId providerId, DeviceId deviceId, PortNumber portNumber) {
35 + this.providerId = providerId;
36 + this.deviceId = deviceId;
37 + this.portNumber = portNumber;
38 + }
39 +
40 + public ProviderId providerId() {
41 + return providerId;
42 + }
43 +
44 + public DeviceId deviceId() {
45 + return deviceId;
46 + }
47 +
48 + public PortNumber portNumber() {
49 + return portNumber;
50 + }
51 +
52 + @Override
53 + public int hashCode() {
54 + return Objects.hash(providerId, deviceId, portNumber);
55 + }
56 +
57 + @Override
58 + public boolean equals(Object obj) {
59 + if (this == obj) {
60 + return true;
61 + }
62 + if (!(obj instanceof PortKey)) {
63 + return false;
64 + }
65 + PortKey that = (PortKey) obj;
66 + return Objects.equals(this.deviceId, that.deviceId) &&
67 + Objects.equals(this.providerId, that.providerId) &&
68 + Objects.equals(this.portNumber, that.portNumber);
69 + }
70 +
71 + @Override
72 + public String toString() {
73 + return MoreObjects.toStringHelper(getClass())
74 + .add("providerId", providerId)
75 + .add("deviceId", deviceId)
76 + .add("portNumber", portNumber)
77 + .toString();
78 + }
79 +}