Committed by
Gerrit Code Review
Separate the child map into two child maps for each resource type
Now, we have a child map for discrete type resources and a child map for continuous type resources. This is a refactoring for ONOS-4281. Change-Id: I1fc931d6b6599885573908606600a61686073bee
Showing
1 changed file
with
170 additions
and
51 deletions
... | @@ -79,8 +79,9 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -79,8 +79,9 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
79 | private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class); | 79 | private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class); |
80 | 80 | ||
81 | private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers"; | 81 | private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers"; |
82 | + private static final String DISCRETE_CHILD_MAP = "onos-resource-discrete-children"; | ||
82 | private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers"; | 83 | private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers"; |
83 | - private static final String CHILD_MAP = "onos-resource-children"; | 84 | + private static final String CONTINUOUS_CHILD_MAP = "onos-resource-continuous-children"; |
84 | private static final Serializer SERIALIZER = Serializer.using( | 85 | private static final Serializer SERIALIZER = Serializer.using( |
85 | Arrays.asList(KryoNamespaces.API), | 86 | Arrays.asList(KryoNamespaces.API), |
86 | ContinuousResourceAllocation.class); | 87 | ContinuousResourceAllocation.class); |
... | @@ -93,8 +94,9 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -93,8 +94,9 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
93 | protected StorageService service; | 94 | protected StorageService service; |
94 | 95 | ||
95 | private ConsistentMap<DiscreteResourceId, ResourceConsumer> discreteConsumers; | 96 | private ConsistentMap<DiscreteResourceId, ResourceConsumer> discreteConsumers; |
97 | + private ConsistentMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildMap; | ||
96 | private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumers; | 98 | private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumers; |
97 | - private ConsistentMap<DiscreteResourceId, Set<Resource>> childMap; | 99 | + private ConsistentMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildMap; |
98 | 100 | ||
99 | @Activate | 101 | @Activate |
100 | public void activate() { | 102 | public void activate() { |
... | @@ -102,16 +104,22 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -102,16 +104,22 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
102 | .withName(DISCRETE_CONSUMER_MAP) | 104 | .withName(DISCRETE_CONSUMER_MAP) |
103 | .withSerializer(SERIALIZER) | 105 | .withSerializer(SERIALIZER) |
104 | .build(); | 106 | .build(); |
107 | + discreteChildMap = service.<DiscreteResourceId, Set<DiscreteResource>>consistentMapBuilder() | ||
108 | + .withName(DISCRETE_CHILD_MAP) | ||
109 | + .withSerializer(SERIALIZER) | ||
110 | + .build(); | ||
105 | continuousConsumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder() | 111 | continuousConsumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder() |
106 | .withName(CONTINUOUS_CONSUMER_MAP) | 112 | .withName(CONTINUOUS_CONSUMER_MAP) |
107 | .withSerializer(SERIALIZER) | 113 | .withSerializer(SERIALIZER) |
108 | .build(); | 114 | .build(); |
109 | - childMap = service.<DiscreteResourceId, Set<Resource>>consistentMapBuilder() | 115 | + continuousChildMap = service.<DiscreteResourceId, Set<ContinuousResource>>consistentMapBuilder() |
110 | - .withName(CHILD_MAP) | 116 | + .withName(CONTINUOUS_CHILD_MAP) |
111 | .withSerializer(SERIALIZER) | 117 | .withSerializer(SERIALIZER) |
112 | .build(); | 118 | .build(); |
113 | 119 | ||
114 | - Tools.retryable(() -> childMap.put(Resource.ROOT.id(), new LinkedHashSet<>()), | 120 | + Tools.retryable(() -> discreteChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()), |
121 | + ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY); | ||
122 | + Tools.retryable(() -> continuousChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()), | ||
115 | ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY); | 123 | ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY); |
116 | log.info("Started"); | 124 | log.info("Started"); |
117 | } | 125 | } |
... | @@ -162,8 +170,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -162,8 +170,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
162 | TransactionContext tx = service.transactionContextBuilder().build(); | 170 | TransactionContext tx = service.transactionContextBuilder().build(); |
163 | tx.begin(); | 171 | tx.begin(); |
164 | 172 | ||
165 | - TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap = | 173 | + TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap = |
166 | - tx.getTransactionalMap(CHILD_MAP, SERIALIZER); | 174 | + tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER); |
175 | + TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap = | ||
176 | + tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER); | ||
167 | 177 | ||
168 | // the order is preserved by LinkedHashMap | 178 | // the order is preserved by LinkedHashMap |
169 | Map<DiscreteResource, List<Resource>> resourceMap = resources.stream() | 179 | Map<DiscreteResource, List<Resource>> resourceMap = resources.stream() |
... | @@ -171,11 +181,11 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -171,11 +181,11 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
171 | .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList())); | 181 | .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList())); |
172 | 182 | ||
173 | for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) { | 183 | for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) { |
174 | - if (!lookup(childTxMap, entry.getKey().id()).isPresent()) { | 184 | + if (!lookup(discreteChildTxMap, continuousChildTxMap, entry.getKey().id()).isPresent()) { |
175 | return abortTransaction(tx); | 185 | return abortTransaction(tx); |
176 | } | 186 | } |
177 | 187 | ||
178 | - if (!appendValues(childTxMap, entry.getKey().id(), entry.getValue())) { | 188 | + if (!appendValues(discreteChildTxMap, continuousChildTxMap, entry.getKey().id(), entry.getValue())) { |
179 | return abortTransaction(tx); | 189 | return abortTransaction(tx); |
180 | } | 190 | } |
181 | } | 191 | } |
... | @@ -201,12 +211,14 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -201,12 +211,14 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
201 | TransactionContext tx = service.transactionContextBuilder().build(); | 211 | TransactionContext tx = service.transactionContextBuilder().build(); |
202 | tx.begin(); | 212 | tx.begin(); |
203 | 213 | ||
204 | - TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap = | ||
205 | - tx.getTransactionalMap(CHILD_MAP, SERIALIZER); | ||
206 | TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = | 214 | TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = |
207 | tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); | 215 | tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); |
216 | + TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap = | ||
217 | + tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER); | ||
208 | TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap = | 218 | TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap = |
209 | tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER); | 219 | tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER); |
220 | + TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap = | ||
221 | + tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER); | ||
210 | 222 | ||
211 | // Look up resources by resource IDs | 223 | // Look up resources by resource IDs |
212 | List<Resource> resources = ids.stream() | 224 | List<Resource> resources = ids.stream() |
... | @@ -216,7 +228,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -216,7 +228,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
216 | if (x instanceof DiscreteResourceId) { | 228 | if (x instanceof DiscreteResourceId) { |
217 | return Optional.of(Resources.discrete((DiscreteResourceId) x).resource()); | 229 | return Optional.of(Resources.discrete((DiscreteResourceId) x).resource()); |
218 | } else { | 230 | } else { |
219 | - return lookup(childTxMap, x); | 231 | + return lookup(continuousChildTxMap, (ContinuousResourceId) x); |
220 | } | 232 | } |
221 | }) | 233 | }) |
222 | .filter(Optional::isPresent) | 234 | .filter(Optional::isPresent) |
... | @@ -245,7 +257,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -245,7 +257,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
245 | return abortTransaction(tx); | 257 | return abortTransaction(tx); |
246 | } | 258 | } |
247 | 259 | ||
248 | - if (!removeValues(childTxMap, entry.getKey(), entry.getValue())) { | 260 | + if (!removeValues(discreteChildTxMap, continuousChildTxMap, entry.getKey(), entry.getValue())) { |
249 | log.warn("Failed to unregister {}: Failed to remove {} values.", | 261 | log.warn("Failed to unregister {}: Failed to remove {} values.", |
250 | entry.getKey(), entry.getValue().size()); | 262 | entry.getKey(), entry.getValue().size()); |
251 | log.debug("Failed to unregister {}: Failed to remove values: {}", | 263 | log.debug("Failed to unregister {}: Failed to remove values: {}", |
... | @@ -275,16 +287,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -275,16 +287,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
275 | TransactionContext tx = service.transactionContextBuilder().build(); | 287 | TransactionContext tx = service.transactionContextBuilder().build(); |
276 | tx.begin(); | 288 | tx.begin(); |
277 | 289 | ||
278 | - TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap = | ||
279 | - tx.getTransactionalMap(CHILD_MAP, SERIALIZER); | ||
280 | TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = | 290 | TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = |
281 | tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); | 291 | tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); |
292 | + TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap = | ||
293 | + tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER); | ||
282 | TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap = | 294 | TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap = |
283 | tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER); | 295 | tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER); |
296 | + TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap = | ||
297 | + tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER); | ||
284 | 298 | ||
285 | for (Resource resource: resources) { | 299 | for (Resource resource: resources) { |
286 | // if the resource is not registered, then abort | 300 | // if the resource is not registered, then abort |
287 | - Optional<Resource> lookedUp = lookup(childTxMap, resource.id()); | 301 | + Optional<Resource> lookedUp = lookup(discreteChildTxMap, continuousChildTxMap, resource.id()); |
288 | if (!lookedUp.isPresent()) { | 302 | if (!lookedUp.isPresent()) { |
289 | return abortTransaction(tx); | 303 | return abortTransaction(tx); |
290 | } | 304 | } |
... | @@ -372,7 +386,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -372,7 +386,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
372 | // computational complexity: O(n) where n is the number of existing allocations for the resource | 386 | // computational complexity: O(n) where n is the number of existing allocations for the resource |
373 | private boolean isAvailable(ContinuousResource resource) { | 387 | private boolean isAvailable(ContinuousResource resource) { |
374 | // check if it's registered or not. | 388 | // check if it's registered or not. |
375 | - Versioned<Set<Resource>> children = childMap.get(resource.parent().get().id()); | 389 | + Versioned<Set<ContinuousResource>> children = continuousChildMap.get(resource.parent().get().id()); |
376 | if (children == null) { | 390 | if (children == null) { |
377 | return false; | 391 | return false; |
378 | } | 392 | } |
... | @@ -380,7 +394,6 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -380,7 +394,6 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
380 | ContinuousResource registered = children.value().stream() | 394 | ContinuousResource registered = children.value().stream() |
381 | .filter(c -> c.id().equals(resource.id())) | 395 | .filter(c -> c.id().equals(resource.id())) |
382 | .findFirst() | 396 | .findFirst() |
383 | - .map(c -> (ContinuousResource) c) | ||
384 | .get(); | 397 | .get(); |
385 | if (registered.value() < resource.value()) { | 398 | if (registered.value() < resource.value()) { |
386 | // Capacity < requested, can never satisfy | 399 | // Capacity < requested, can never satisfy |
... | @@ -424,12 +437,21 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -424,12 +437,21 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
424 | public Set<Resource> getChildResources(DiscreteResourceId parent) { | 437 | public Set<Resource> getChildResources(DiscreteResourceId parent) { |
425 | checkNotNull(parent); | 438 | checkNotNull(parent); |
426 | 439 | ||
427 | - Versioned<Set<Resource>> children = childMap.get(parent); | 440 | + Versioned<Set<DiscreteResource>> discreteChildren = discreteChildMap.get(parent); |
428 | - if (children == null) { | 441 | + Versioned<Set<ContinuousResource>> continuousChildren = continuousChildMap.get(parent); |
442 | + | ||
443 | + if (discreteChildren == null && continuousChildren == null) { | ||
429 | return ImmutableSet.of(); | 444 | return ImmutableSet.of(); |
445 | + } else if (discreteChildren == null) { | ||
446 | + return ImmutableSet.copyOf(continuousChildren.value()); | ||
447 | + } else if (continuousChildren == null) { | ||
448 | + return ImmutableSet.copyOf(discreteChildren.value()); | ||
449 | + } else { | ||
450 | + return ImmutableSet.<Resource>builder() | ||
451 | + .addAll(discreteChildren.value()) | ||
452 | + .addAll(continuousChildren.value()) | ||
453 | + .build(); | ||
430 | } | 454 | } |
431 | - | ||
432 | - return children.value(); | ||
433 | } | 455 | } |
434 | 456 | ||
435 | // computational complexity: O(n) where n is the number of the children of the parent | 457 | // computational complexity: O(n) where n is the number of the children of the parent |
... | @@ -438,18 +460,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -438,18 +460,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
438 | checkNotNull(parent); | 460 | checkNotNull(parent); |
439 | checkNotNull(cls); | 461 | checkNotNull(cls); |
440 | 462 | ||
441 | - Versioned<Set<Resource>> children = childMap.get(parent); | 463 | + Set<Resource> children = getChildResources(parent); |
442 | - if (children == null) { | 464 | + if (children.isEmpty()) { |
443 | - return ImmutableList.of(); | 465 | + return children; |
444 | } | 466 | } |
445 | 467 | ||
446 | - Stream<DiscreteResource> discrete = children.value().stream() | 468 | + Stream<DiscreteResource> discrete = children.stream() |
447 | .filter(x -> x.isTypeOf(cls)) | 469 | .filter(x -> x.isTypeOf(cls)) |
448 | .filter(x -> x instanceof DiscreteResource) | 470 | .filter(x -> x instanceof DiscreteResource) |
449 | .map(x -> ((DiscreteResource) x)) | 471 | .map(x -> ((DiscreteResource) x)) |
450 | .filter(x -> discreteConsumers.containsKey(x.id())); | 472 | .filter(x -> discreteConsumers.containsKey(x.id())); |
451 | 473 | ||
452 | - Stream<ContinuousResource> continuous = children.value().stream() | 474 | + Stream<ContinuousResource> continuous = children.stream() |
453 | .filter(x -> x.id().equals(parent.child(cls))) | 475 | .filter(x -> x.id().equals(parent.child(cls))) |
454 | .filter(x -> x instanceof ContinuousResource) | 476 | .filter(x -> x instanceof ContinuousResource) |
455 | .map(x -> (ContinuousResource) x) | 477 | .map(x -> (ContinuousResource) x) |
... | @@ -505,29 +527,77 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -505,29 +527,77 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
505 | * Appends the values to the existing values associated with the specified key. | 527 | * Appends the values to the existing values associated with the specified key. |
506 | * If the map already has all the given values, appending will not happen. | 528 | * If the map already has all the given values, appending will not happen. |
507 | * | 529 | * |
508 | - * @param map map holding multiple values for a key | 530 | + * @param discreteTxMap map holding multiple discrete resources for a key |
531 | + * @param continuousTxMap map holding multiple continuous resources for a key | ||
509 | * @param key key specifying values | 532 | * @param key key specifying values |
510 | * @param values values to be appended | 533 | * @param values values to be appended |
511 | * @return true if the operation succeeds, false otherwise. | 534 | * @return true if the operation succeeds, false otherwise. |
512 | */ | 535 | */ |
513 | // computational complexity: O(n) where n is the number of the specified value | 536 | // computational complexity: O(n) where n is the number of the specified value |
514 | - private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<Resource>> map, | 537 | + private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, |
538 | + TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, | ||
515 | DiscreteResourceId key, List<Resource> values) { | 539 | DiscreteResourceId key, List<Resource> values) { |
516 | - Set<Resource> requested = new LinkedHashSet<>(values); | 540 | + // it's assumed that the passed "values" is non-empty |
517 | - Set<Resource> oldValues = map.putIfAbsent(key, requested); | 541 | + |
542 | + // This is 2-pass scan. Nicer to have 1-pass scan | ||
543 | + List<DiscreteResource> discreteValues = values.stream() | ||
544 | + .filter(x -> x instanceof DiscreteResource) | ||
545 | + .map(x -> (DiscreteResource) x) | ||
546 | + .collect(Collectors.toList()); | ||
547 | + List<ContinuousResource> continuousValues = values.stream() | ||
548 | + .filter(x -> x instanceof ContinuousResource) | ||
549 | + .map(x -> (ContinuousResource) x) | ||
550 | + .collect(Collectors.toList()); | ||
551 | + | ||
552 | + // short-circuit decision avoiding unnecessary distributed map operations | ||
553 | + if (continuousValues.isEmpty()) { | ||
554 | + return appendValues(discreteTxMap, key, discreteValues, null); | ||
555 | + } | ||
556 | + if (discreteValues.isEmpty()) { | ||
557 | + return appendValues(continuousTxMap, key, continuousValues, null); | ||
558 | + } | ||
559 | + | ||
560 | + return appendValues(discreteTxMap, key, discreteValues, null) | ||
561 | + && appendValues(continuousTxMap, key, continuousValues, null); | ||
562 | + } | ||
563 | + | ||
564 | + private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> map, | ||
565 | + DiscreteResourceId key, List<DiscreteResource> values, DiscreteResource dummy) { | ||
566 | + Set<DiscreteResource> requested = new LinkedHashSet<>(values); | ||
567 | + Set<DiscreteResource> oldValues = map.putIfAbsent(key, requested); | ||
518 | if (oldValues == null) { | 568 | if (oldValues == null) { |
519 | return true; | 569 | return true; |
520 | } | 570 | } |
521 | 571 | ||
522 | - Set<Resource> addedValues = Sets.difference(requested, oldValues); | 572 | + Set<DiscreteResource> addedValues = Sets.difference(requested, oldValues); |
523 | // no new value, then no-op | 573 | // no new value, then no-op |
524 | if (addedValues.isEmpty()) { | 574 | if (addedValues.isEmpty()) { |
525 | // don't write to map because all values are already stored | 575 | // don't write to map because all values are already stored |
526 | return true; | 576 | return true; |
527 | } | 577 | } |
528 | 578 | ||
529 | - Set<ResourceId> addedIds = addedValues.stream() | 579 | + Set<DiscreteResource> newValues = new LinkedHashSet<>(oldValues); |
530 | - .map(Resource::id) | 580 | + newValues.addAll(addedValues); |
581 | + return map.replace(key, oldValues, newValues); | ||
582 | + } | ||
583 | + | ||
584 | + private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> map, | ||
585 | + DiscreteResourceId key, List<ContinuousResource> values, ContinuousResource dummy) { | ||
586 | + Set<ContinuousResource> requested = new LinkedHashSet<>(values); | ||
587 | + Set<ContinuousResource> oldValues = map.putIfAbsent(key, requested); | ||
588 | + if (oldValues == null) { | ||
589 | + return true; | ||
590 | + } | ||
591 | + | ||
592 | + Set<ContinuousResource> addedValues = Sets.difference(requested, oldValues); | ||
593 | + // no new value, then no-op | ||
594 | + if (addedValues.isEmpty()) { | ||
595 | + // don't write to map because all values are already stored | ||
596 | + return true; | ||
597 | + } | ||
598 | + | ||
599 | + Set<ContinuousResourceId> addedIds = addedValues.stream() | ||
600 | + .map(ContinuousResource::id) | ||
531 | .collect(Collectors.toSet()); | 601 | .collect(Collectors.toSet()); |
532 | // if the value is not found but the same ID is found | 602 | // if the value is not found but the same ID is found |
533 | // (this happens only when being continuous resource) | 603 | // (this happens only when being continuous resource) |
... | @@ -535,7 +605,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -535,7 +605,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
535 | // no-op, but indicating failure (reject the request) | 605 | // no-op, but indicating failure (reject the request) |
536 | return false; | 606 | return false; |
537 | } | 607 | } |
538 | - Set<Resource> newValues = new LinkedHashSet<>(oldValues); | 608 | + Set<ContinuousResource> newValues = new LinkedHashSet<>(oldValues); |
539 | newValues.addAll(addedValues); | 609 | newValues.addAll(addedValues); |
540 | return map.replace(key, oldValues, newValues); | 610 | return map.replace(key, oldValues, newValues); |
541 | } | 611 | } |
... | @@ -544,15 +614,41 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -544,15 +614,41 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
544 | * Removes the values from the existing values associated with the specified key. | 614 | * Removes the values from the existing values associated with the specified key. |
545 | * If the map doesn't contain the given values, removal will not happen. | 615 | * If the map doesn't contain the given values, removal will not happen. |
546 | * | 616 | * |
547 | - * @param map map holding multiple values for a key | 617 | + * @param discreteTxMap map holding multiple discrete resources for a key |
618 | + * @param continuousTxMap map holding multiple continuous resources for a key | ||
548 | * @param key key specifying values | 619 | * @param key key specifying values |
549 | * @param values values to be removed | 620 | * @param values values to be removed |
550 | * @return true if the operation succeeds, false otherwise | 621 | * @return true if the operation succeeds, false otherwise |
551 | */ | 622 | */ |
552 | - // computational complexity: O(n) where n is the number of the specified values | 623 | + private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, |
553 | - private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<Resource>> map, | 624 | + TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, |
554 | DiscreteResourceId key, List<Resource> values) { | 625 | DiscreteResourceId key, List<Resource> values) { |
555 | - Set<Resource> oldValues = map.putIfAbsent(key, new LinkedHashSet<>()); | 626 | + // it's assumed that the passed "values" is non-empty |
627 | + | ||
628 | + // This is 2-pass scan. Nicer to have 1-pass scan | ||
629 | + List<DiscreteResource> discreteValues = values.stream() | ||
630 | + .filter(x -> x instanceof DiscreteResource) | ||
631 | + .map(x -> (DiscreteResource) x) | ||
632 | + .collect(Collectors.toList()); | ||
633 | + List<ContinuousResource> continuousValues = values.stream() | ||
634 | + .filter(x -> x instanceof ContinuousResource) | ||
635 | + .map(x -> (ContinuousResource) x) | ||
636 | + .collect(Collectors.toList()); | ||
637 | + | ||
638 | + // short-circuit decision avoiding unnecessary distributed map operations | ||
639 | + if (continuousValues.isEmpty()) { | ||
640 | + return removeValues(discreteTxMap, key, discreteValues); | ||
641 | + } | ||
642 | + if (discreteValues.isEmpty()) { | ||
643 | + return removeValues(continuousTxMap, key, continuousValues); | ||
644 | + } | ||
645 | + | ||
646 | + return removeValues(discreteTxMap, key, discreteValues) && removeValues(continuousTxMap, key, continuousValues); | ||
647 | + } | ||
648 | + | ||
649 | + private <T extends Resource> boolean removeValues(TransactionalMap<DiscreteResourceId, Set<T>> map, | ||
650 | + DiscreteResourceId key, List<T> values) { | ||
651 | + Set<T> oldValues = map.putIfAbsent(key, new LinkedHashSet<>()); | ||
556 | if (oldValues == null) { | 652 | if (oldValues == null) { |
557 | log.trace("No-Op removing values. key {} did not exist", key); | 653 | log.trace("No-Op removing values. key {} did not exist", key); |
558 | return true; | 654 | return true; |
... | @@ -564,47 +660,70 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour | ... | @@ -564,47 +660,70 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour |
564 | return true; | 660 | return true; |
565 | } | 661 | } |
566 | 662 | ||
567 | - LinkedHashSet<Resource> newValues = new LinkedHashSet<>(oldValues); | 663 | + LinkedHashSet<T> newValues = new LinkedHashSet<>(oldValues); |
568 | newValues.removeAll(values); | 664 | newValues.removeAll(values); |
569 | return map.replace(key, oldValues, newValues); | 665 | return map.replace(key, oldValues, newValues); |
570 | - } | ||
571 | 666 | ||
667 | + } | ||
572 | /** | 668 | /** |
573 | * Returns the resource which has the same key as the specified resource ID | 669 | * Returns the resource which has the same key as the specified resource ID |
574 | * in the set as a value of the map. | 670 | * in the set as a value of the map. |
575 | * | 671 | * |
576 | - * @param childTxMap map storing parent - child relationship of resources | 672 | + * @param discreteTxMap map storing parent - child relationship of discrete resources |
673 | + * @param continuousTxMap map storing parent -child relationship of continuous resources | ||
577 | * @param id ID of resource to be checked | 674 | * @param id ID of resource to be checked |
578 | * @return the resource which is regarded as the same as the specified resource | 675 | * @return the resource which is regarded as the same as the specified resource |
579 | */ | 676 | */ |
580 | // Naive implementation, which traverses all elements in the set when continuous resource | 677 | // Naive implementation, which traverses all elements in the set when continuous resource |
581 | // computational complexity: O(1) when discrete resource. O(n) when continuous resource | 678 | // computational complexity: O(1) when discrete resource. O(n) when continuous resource |
582 | // where n is the number of elements in the associated set | 679 | // where n is the number of elements in the associated set |
583 | - private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<Resource>> childTxMap, ResourceId id) { | 680 | + private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, |
681 | + TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, | ||
682 | + ResourceId id) { | ||
683 | + if (id instanceof DiscreteResourceId) { | ||
684 | + return lookup(discreteTxMap, (DiscreteResourceId) id); | ||
685 | + } else if (id instanceof ContinuousResourceId) { | ||
686 | + return lookup(continuousTxMap, (ContinuousResourceId) id); | ||
687 | + } else { | ||
688 | + return Optional.empty(); | ||
689 | + } | ||
690 | + } | ||
691 | + | ||
692 | + // check the existence in the set: O(1) operation | ||
693 | + private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, | ||
694 | + DiscreteResourceId id) { | ||
584 | if (!id.parent().isPresent()) { | 695 | if (!id.parent().isPresent()) { |
585 | return Optional.of(Resource.ROOT); | 696 | return Optional.of(Resource.ROOT); |
586 | } | 697 | } |
587 | 698 | ||
588 | - Set<Resource> values = childTxMap.get(id.parent().get()); | 699 | + Set<DiscreteResource> values = discreteTxMap.get(id.parent().get()); |
589 | if (values == null) { | 700 | if (values == null) { |
590 | return Optional.empty(); | 701 | return Optional.empty(); |
591 | } | 702 | } |
592 | 703 | ||
593 | - // short-circuit if discrete resource | 704 | + DiscreteResource resource = Resources.discrete(id).resource(); |
594 | - // check the existence in the set: O(1) operation | 705 | + if (values.contains(resource)) { |
595 | - if (id instanceof DiscreteResourceId) { | 706 | + return Optional.of(resource); |
596 | - DiscreteResource discrete = Resources.discrete((DiscreteResourceId) id).resource(); | ||
597 | - if (values.contains(discrete)) { | ||
598 | - return Optional.of(discrete); | ||
599 | } else { | 707 | } else { |
600 | return Optional.empty(); | 708 | return Optional.empty(); |
601 | } | 709 | } |
602 | } | 710 | } |
603 | 711 | ||
604 | - // continuous resource case | ||
605 | // iterate over the values in the set: O(n) operation | 712 | // iterate over the values in the set: O(n) operation |
713 | + private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, | ||
714 | + ContinuousResourceId id) { | ||
715 | + if (!id.parent().isPresent()) { | ||
716 | + return Optional.of(Resource.ROOT); | ||
717 | + } | ||
718 | + | ||
719 | + Set<ContinuousResource> values = continuousTxMap.get(id.parent().get()); | ||
720 | + if (values == null) { | ||
721 | + return Optional.empty(); | ||
722 | + } | ||
723 | + | ||
606 | return values.stream() | 724 | return values.stream() |
607 | .filter(x -> x.id().equals(id)) | 725 | .filter(x -> x.id().equals(id)) |
726 | + .map(x -> (Resource) x) | ||
608 | .findFirst(); | 727 | .findFirst(); |
609 | } | 728 | } |
610 | 729 | ... | ... |
-
Please register or login to post a comment