GossipDeviceStore: switch inner Map to non-Concurrent
Change-Id: I3f3a36f2249c1ba36175ad81fa374f7d8ab89af1
Showing
1 changed file
with
66 additions
and
40 deletions
... | @@ -86,14 +86,11 @@ public class GossipDeviceStore | ... | @@ -86,14 +86,11 @@ public class GossipDeviceStore |
86 | 86 | ||
87 | private final Logger log = getLogger(getClass()); | 87 | private final Logger log = getLogger(getClass()); |
88 | 88 | ||
89 | - public static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; | 89 | + private static final String DEVICE_NOT_FOUND = "Device with ID %s not found"; |
90 | 90 | ||
91 | - // TODO: Check if inner Map can be replaced with plain Map. | ||
92 | // innerMap is used to lock a Device, thus instance should never be replaced. | 91 | // innerMap is used to lock a Device, thus instance should never be replaced. |
93 | - | ||
94 | // collection of Description given from various providers | 92 | // collection of Description given from various providers |
95 | - private final ConcurrentMap<DeviceId, | 93 | + private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>> |
96 | - ConcurrentMap<ProviderId, DeviceDescriptions>> | ||
97 | deviceDescs = Maps.newConcurrentMap(); | 94 | deviceDescs = Maps.newConcurrentMap(); |
98 | 95 | ||
99 | // cache of Device and Ports generated by compositing descriptions from providers | 96 | // cache of Device and Ports generated by compositing descriptions from providers |
... | @@ -208,9 +205,9 @@ public class GossipDeviceStore | ... | @@ -208,9 +205,9 @@ public class GossipDeviceStore |
208 | final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); | 205 | final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp); |
209 | final DeviceEvent event; | 206 | final DeviceEvent event; |
210 | final Timestamped<DeviceDescription> mergedDesc; | 207 | final Timestamped<DeviceDescription> mergedDesc; |
211 | - synchronized (getDeviceDescriptions(deviceId)) { | 208 | + synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) { |
212 | event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc); | 209 | event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc); |
213 | - mergedDesc = getDeviceDescriptions(deviceId).get(providerId).getDeviceDesc(); | 210 | + mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc(); |
214 | } | 211 | } |
215 | if (event != null) { | 212 | if (event != null) { |
216 | log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", | 213 | log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}", |
... | @@ -230,8 +227,8 @@ public class GossipDeviceStore | ... | @@ -230,8 +227,8 @@ public class GossipDeviceStore |
230 | Timestamped<DeviceDescription> deltaDesc) { | 227 | Timestamped<DeviceDescription> deltaDesc) { |
231 | 228 | ||
232 | // Collection of DeviceDescriptions for a Device | 229 | // Collection of DeviceDescriptions for a Device |
233 | - ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs | 230 | + Map<ProviderId, DeviceDescriptions> providerDescs |
234 | - = getDeviceDescriptions(deviceId); | 231 | + = getOrCreateDeviceDescriptionsMap(deviceId); |
235 | 232 | ||
236 | synchronized (providerDescs) { | 233 | synchronized (providerDescs) { |
237 | // locking per device | 234 | // locking per device |
... | @@ -241,9 +238,7 @@ public class GossipDeviceStore | ... | @@ -241,9 +238,7 @@ public class GossipDeviceStore |
241 | return null; | 238 | return null; |
242 | } | 239 | } |
243 | 240 | ||
244 | - DeviceDescriptions descs | 241 | + DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc); |
245 | - = createIfAbsentUnchecked(providerDescs, providerId, | ||
246 | - new InitDeviceDescs(deltaDesc)); | ||
247 | 242 | ||
248 | final Device oldDevice = devices.get(deviceId); | 243 | final Device oldDevice = devices.get(deviceId); |
249 | final Device newDevice; | 244 | final Device newDevice; |
... | @@ -338,7 +333,7 @@ public class GossipDeviceStore | ... | @@ -338,7 +333,7 @@ public class GossipDeviceStore |
338 | private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { | 333 | private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) { |
339 | 334 | ||
340 | Map<ProviderId, DeviceDescriptions> providerDescs | 335 | Map<ProviderId, DeviceDescriptions> providerDescs |
341 | - = getDeviceDescriptions(deviceId); | 336 | + = getOrCreateDeviceDescriptionsMap(deviceId); |
342 | 337 | ||
343 | // locking device | 338 | // locking device |
344 | synchronized (providerDescs) { | 339 | synchronized (providerDescs) { |
... | @@ -401,9 +396,9 @@ public class GossipDeviceStore | ... | @@ -401,9 +396,9 @@ public class GossipDeviceStore |
401 | final List<DeviceEvent> events; | 396 | final List<DeviceEvent> events; |
402 | final Timestamped<List<PortDescription>> merged; | 397 | final Timestamped<List<PortDescription>> merged; |
403 | 398 | ||
404 | - synchronized (getDeviceDescriptions(deviceId)) { | 399 | + synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) { |
405 | events = updatePortsInternal(providerId, deviceId, timestampedInput); | 400 | events = updatePortsInternal(providerId, deviceId, timestampedInput); |
406 | - final DeviceDescriptions descs = getDeviceDescriptions(deviceId).get(providerId); | 401 | + final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId); |
407 | List<PortDescription> mergedList = | 402 | List<PortDescription> mergedList = |
408 | FluentIterable.from(portDescriptions) | 403 | FluentIterable.from(portDescriptions) |
409 | .transform(new Function<PortDescription, PortDescription>() { | 404 | .transform(new Function<PortDescription, PortDescription>() { |
... | @@ -435,7 +430,7 @@ public class GossipDeviceStore | ... | @@ -435,7 +430,7 @@ public class GossipDeviceStore |
435 | Device device = devices.get(deviceId); | 430 | Device device = devices.get(deviceId); |
436 | checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); | 431 | checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); |
437 | 432 | ||
438 | - ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); | 433 | + Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); |
439 | checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); | 434 | checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); |
440 | 435 | ||
441 | List<DeviceEvent> events = new ArrayList<>(); | 436 | List<DeviceEvent> events = new ArrayList<>(); |
... | @@ -539,10 +534,34 @@ public class GossipDeviceStore | ... | @@ -539,10 +534,34 @@ public class GossipDeviceStore |
539 | NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); | 534 | NewConcurrentHashMap.<PortNumber, Port>ifNeeded()); |
540 | } | 535 | } |
541 | 536 | ||
542 | - private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions( | 537 | + private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap( |
543 | DeviceId deviceId) { | 538 | DeviceId deviceId) { |
544 | - return createIfAbsentUnchecked(deviceDescs, deviceId, | 539 | + Map<ProviderId, DeviceDescriptions> r; |
545 | - NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded()); | 540 | + r = deviceDescs.get(deviceId); |
541 | + if (r == null) { | ||
542 | + r = new HashMap<ProviderId, DeviceDescriptions>(); | ||
543 | + final Map<ProviderId, DeviceDescriptions> concurrentlyAdded; | ||
544 | + concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r); | ||
545 | + if (concurrentlyAdded != null) { | ||
546 | + r = concurrentlyAdded; | ||
547 | + } | ||
548 | + } | ||
549 | + return r; | ||
550 | + } | ||
551 | + | ||
552 | + // Guarded by deviceDescs value (=Device lock) | ||
553 | + private DeviceDescriptions getOrCreateProviderDeviceDescriptions( | ||
554 | + Map<ProviderId, DeviceDescriptions> device, | ||
555 | + ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) { | ||
556 | + | ||
557 | + synchronized (device) { | ||
558 | + DeviceDescriptions r = device.get(providerId); | ||
559 | + if (r == null) { | ||
560 | + r = new DeviceDescriptions(deltaDesc); | ||
561 | + device.put(providerId, r); | ||
562 | + } | ||
563 | + return r; | ||
564 | + } | ||
546 | } | 565 | } |
547 | 566 | ||
548 | @Override | 567 | @Override |
... | @@ -555,9 +574,9 @@ public class GossipDeviceStore | ... | @@ -555,9 +574,9 @@ public class GossipDeviceStore |
555 | = new Timestamped<>(portDescription, newTimestamp); | 574 | = new Timestamped<>(portDescription, newTimestamp); |
556 | final DeviceEvent event; | 575 | final DeviceEvent event; |
557 | final Timestamped<PortDescription> mergedDesc; | 576 | final Timestamped<PortDescription> mergedDesc; |
558 | - synchronized (getDeviceDescriptions(deviceId)) { | 577 | + synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) { |
559 | event = updatePortStatusInternal(providerId, deviceId, deltaDesc); | 578 | event = updatePortStatusInternal(providerId, deviceId, deltaDesc); |
560 | - mergedDesc = getDeviceDescriptions(deviceId).get(providerId) | 579 | + mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId) |
561 | .getPortDesc(portDescription.portNumber()); | 580 | .getPortDesc(portDescription.portNumber()); |
562 | } | 581 | } |
563 | if (event != null) { | 582 | if (event != null) { |
... | @@ -579,7 +598,7 @@ public class GossipDeviceStore | ... | @@ -579,7 +598,7 @@ public class GossipDeviceStore |
579 | Device device = devices.get(deviceId); | 598 | Device device = devices.get(deviceId); |
580 | checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); | 599 | checkArgument(device != null, DEVICE_NOT_FOUND, deviceId); |
581 | 600 | ||
582 | - ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); | 601 | + Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId); |
583 | checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); | 602 | checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId); |
584 | 603 | ||
585 | synchronized (descsMap) { | 604 | synchronized (descsMap) { |
... | @@ -591,7 +610,7 @@ public class GossipDeviceStore | ... | @@ -591,7 +610,7 @@ public class GossipDeviceStore |
591 | 610 | ||
592 | DeviceDescriptions descs = descsMap.get(providerId); | 611 | DeviceDescriptions descs = descsMap.get(providerId); |
593 | // assuming all providers must to give DeviceDescription | 612 | // assuming all providers must to give DeviceDescription |
594 | - checkArgument(descs != null, | 613 | + verify(descs != null, |
595 | "Device description for Device ID %s from Provider %s was not found", | 614 | "Device description for Device ID %s from Provider %s was not found", |
596 | deviceId, providerId); | 615 | deviceId, providerId); |
597 | 616 | ||
... | @@ -661,7 +680,7 @@ public class GossipDeviceStore | ... | @@ -661,7 +680,7 @@ public class GossipDeviceStore |
661 | private DeviceEvent removeDeviceInternal(DeviceId deviceId, | 680 | private DeviceEvent removeDeviceInternal(DeviceId deviceId, |
662 | Timestamp timestamp) { | 681 | Timestamp timestamp) { |
663 | 682 | ||
664 | - Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId); | 683 | + Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId); |
665 | synchronized (descs) { | 684 | synchronized (descs) { |
666 | // accept removal request if given timestamp is newer than | 685 | // accept removal request if given timestamp is newer than |
667 | // the latest Timestamp from Primary provider | 686 | // the latest Timestamp from Primary provider |
... | @@ -751,14 +770,14 @@ public class GossipDeviceStore | ... | @@ -751,14 +770,14 @@ public class GossipDeviceStore |
751 | * | 770 | * |
752 | * @param device device the port is on | 771 | * @param device device the port is on |
753 | * @param number port number | 772 | * @param number port number |
754 | - * @param providerDescs Collection of Descriptions from multiple providers | 773 | + * @param descsMap Collection of Descriptions from multiple providers |
755 | * @return Port instance | 774 | * @return Port instance |
756 | */ | 775 | */ |
757 | private Port composePort(Device device, PortNumber number, | 776 | private Port composePort(Device device, PortNumber number, |
758 | - ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) { | 777 | + Map<ProviderId, DeviceDescriptions> descsMap) { |
759 | 778 | ||
760 | - ProviderId primary = pickPrimaryPID(providerDescs); | 779 | + ProviderId primary = pickPrimaryPID(descsMap); |
761 | - DeviceDescriptions primDescs = providerDescs.get(primary); | 780 | + DeviceDescriptions primDescs = descsMap.get(primary); |
762 | // if no primary, assume not enabled | 781 | // if no primary, assume not enabled |
763 | // TODO: revisit this default port enabled/disabled behavior | 782 | // TODO: revisit this default port enabled/disabled behavior |
764 | boolean isEnabled = false; | 783 | boolean isEnabled = false; |
... | @@ -770,7 +789,7 @@ public class GossipDeviceStore | ... | @@ -770,7 +789,7 @@ public class GossipDeviceStore |
770 | annotations = merge(annotations, portDesc.value().annotations()); | 789 | annotations = merge(annotations, portDesc.value().annotations()); |
771 | } | 790 | } |
772 | 791 | ||
773 | - for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) { | 792 | + for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) { |
774 | if (e.getKey().equals(primary)) { | 793 | if (e.getKey().equals(primary)) { |
775 | continue; | 794 | continue; |
776 | } | 795 | } |
... | @@ -893,41 +912,48 @@ public class GossipDeviceStore | ... | @@ -893,41 +912,48 @@ public class GossipDeviceStore |
893 | private DeviceAntiEntropyAdvertisement createAdvertisement() { | 912 | private DeviceAntiEntropyAdvertisement createAdvertisement() { |
894 | final NodeId self = clusterService.getLocalNode().id(); | 913 | final NodeId self = clusterService.getLocalNode().id(); |
895 | 914 | ||
896 | - Map<DeviceFragmentId, Timestamp> devices = new HashMap<>(deviceDescs.size()); | 915 | + final int numDevices = deviceDescs.size(); |
897 | - final int portsPerDevice = 8; // random guess to minimize reallocation | 916 | + Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices); |
898 | - Map<PortFragmentId, Timestamp> ports = new HashMap<>(devices.size() * portsPerDevice); | 917 | + final int portsPerDevice = 8; // random factor to minimize reallocation |
899 | - Map<DeviceId, Timestamp> offline = new HashMap<>(devices.size()); | 918 | + Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice); |
919 | + Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices); | ||
900 | 920 | ||
901 | - for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> | 921 | + for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> |
902 | provs : deviceDescs.entrySet()) { | 922 | provs : deviceDescs.entrySet()) { |
903 | 923 | ||
924 | + // for each Device... | ||
904 | final DeviceId deviceId = provs.getKey(); | 925 | final DeviceId deviceId = provs.getKey(); |
905 | - final ConcurrentMap<ProviderId, DeviceDescriptions> devDescs = provs.getValue(); | 926 | + final Map<ProviderId, DeviceDescriptions> devDescs = provs.getValue(); |
906 | synchronized (devDescs) { | 927 | synchronized (devDescs) { |
907 | 928 | ||
908 | - offline.put(deviceId, this.offline.get(deviceId)); | 929 | + // send device offline timestamp |
930 | + Timestamp lOffline = this.offline.get(deviceId); | ||
931 | + if (lOffline != null) { | ||
932 | + adOffline.put(deviceId, lOffline); | ||
933 | + } | ||
909 | 934 | ||
910 | for (Entry<ProviderId, DeviceDescriptions> | 935 | for (Entry<ProviderId, DeviceDescriptions> |
911 | prov : devDescs.entrySet()) { | 936 | prov : devDescs.entrySet()) { |
912 | 937 | ||
938 | + // for each Provider Descriptions... | ||
913 | final ProviderId provId = prov.getKey(); | 939 | final ProviderId provId = prov.getKey(); |
914 | final DeviceDescriptions descs = prov.getValue(); | 940 | final DeviceDescriptions descs = prov.getValue(); |
915 | 941 | ||
916 | - devices.put(new DeviceFragmentId(deviceId, provId), | 942 | + adDevices.put(new DeviceFragmentId(deviceId, provId), |
917 | descs.getDeviceDesc().timestamp()); | 943 | descs.getDeviceDesc().timestamp()); |
918 | 944 | ||
919 | for (Entry<PortNumber, Timestamped<PortDescription>> | 945 | for (Entry<PortNumber, Timestamped<PortDescription>> |
920 | portDesc : descs.getPortDescs().entrySet()) { | 946 | portDesc : descs.getPortDescs().entrySet()) { |
921 | 947 | ||
922 | final PortNumber number = portDesc.getKey(); | 948 | final PortNumber number = portDesc.getKey(); |
923 | - ports.put(new PortFragmentId(deviceId, provId, number), | 949 | + adPorts.put(new PortFragmentId(deviceId, provId, number), |
924 | portDesc.getValue().timestamp()); | 950 | portDesc.getValue().timestamp()); |
925 | } | 951 | } |
926 | } | 952 | } |
927 | } | 953 | } |
928 | } | 954 | } |
929 | 955 | ||
930 | - return new DeviceAntiEntropyAdvertisement(self, devices, ports, offline); | 956 | + return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline); |
931 | } | 957 | } |
932 | 958 | ||
933 | /** | 959 | /** |
... | @@ -950,7 +976,7 @@ public class GossipDeviceStore | ... | @@ -950,7 +976,7 @@ public class GossipDeviceStore |
950 | Collection<DeviceFragmentId> reqDevices = new ArrayList<>(); | 976 | Collection<DeviceFragmentId> reqDevices = new ArrayList<>(); |
951 | Collection<PortFragmentId> reqPorts = new ArrayList<>(); | 977 | Collection<PortFragmentId> reqPorts = new ArrayList<>(); |
952 | 978 | ||
953 | - for (Entry<DeviceId, ConcurrentMap<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) { | 979 | + for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) { |
954 | final DeviceId deviceId = de.getKey(); | 980 | final DeviceId deviceId = de.getKey(); |
955 | final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue(); | 981 | final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue(); |
956 | 982 | ... | ... |
-
Please register or login to post a comment