Sho SHIMIZU

Separate ResourceStore into stores for discrete and continuous type

This is a preliminary work for ONOS-4281.

Change-Id: Ifed9c761eb16f6a249a9d069948edc7421301617
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.resource.impl;
17 +
18 +import com.google.common.collect.ImmutableList;
19 +import com.google.common.collect.ImmutableSet;
20 +import com.google.common.collect.Maps;
21 +import org.onlab.util.GuavaCollectors;
22 +import org.onlab.util.Tools;
23 +import org.onosproject.net.resource.ContinuousResource;
24 +import org.onosproject.net.resource.ContinuousResourceId;
25 +import org.onosproject.net.resource.DiscreteResourceId;
26 +import org.onosproject.net.resource.Resource;
27 +import org.onosproject.net.resource.ResourceAllocation;
28 +import org.onosproject.net.resource.ResourceConsumer;
29 +import org.onosproject.store.service.ConsistentMap;
30 +import org.onosproject.store.service.ConsistentMapException;
31 +import org.onosproject.store.service.StorageService;
32 +import org.onosproject.store.service.TransactionContext;
33 +import org.onosproject.store.service.Versioned;
34 +
35 +import java.util.LinkedHashSet;
36 +import java.util.List;
37 +import java.util.Set;
38 +import java.util.stream.Stream;
39 +
40 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.ContinuousResourceAllocation;
41 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.MAX_RETRIES;
42 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.RETRY_DELAY;
43 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.SERIALIZER;
44 +import static org.onosproject.store.resource.impl.ResourceStoreUtil.hasEnoughResource;
45 +
46 +class ConsistentContinuousResourceStore {
47 + private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> consumers;
48 + private ConsistentMap<DiscreteResourceId, Set<ContinuousResource>> childMap;
49 +
50 + ConsistentContinuousResourceStore(StorageService service) {
51 + this.consumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder()
52 + .withName(MapNames.CONTINUOUS_CONSUMER_MAP)
53 + .withSerializer(SERIALIZER)
54 + .build();
55 + this.childMap = service.<DiscreteResourceId, Set<ContinuousResource>>consistentMapBuilder()
56 + .withName(MapNames.CONTINUOUS_CHILD_MAP)
57 + .withSerializer(SERIALIZER)
58 + .build();
59 +
60 + Tools.retryable(() -> childMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
61 + ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
62 + }
63 +
64 + TransactionalContinuousResourceStore transactional(TransactionContext tx) {
65 + return new TransactionalContinuousResourceStore(tx);
66 + }
67 +
68 + // computational complexity: O(n) where n is the number of the existing allocations for the resource
69 + List<ResourceAllocation> getResourceAllocations(ContinuousResourceId resource) {
70 + Versioned<ContinuousResourceAllocation> allocations = consumers.get(resource);
71 + if (allocations == null) {
72 + return ImmutableList.of();
73 + }
74 +
75 + return allocations.value().allocations().stream()
76 + .filter(x -> x.resource().id().equals(resource))
77 + .collect(GuavaCollectors.toImmutableList());
78 + }
79 +
80 + Set<ContinuousResource> getChildResources(DiscreteResourceId parent) {
81 + Versioned<Set<ContinuousResource>> children = childMap.get(parent);
82 +
83 + if (children == null) {
84 + return ImmutableSet.of();
85 + }
86 +
87 + return children.value();
88 + }
89 +
90 + public boolean isAvailable(ContinuousResource resource) {
91 + // check if it's registered or not.
92 + Versioned<Set<ContinuousResource>> children = childMap.get(resource.parent().get().id());
93 + if (children == null) {
94 + return false;
95 + }
96 +
97 + ContinuousResource registered = children.value().stream()
98 + .filter(c -> c.id().equals(resource.id()))
99 + .findFirst()
100 + .get();
101 + if (registered.value() < resource.value()) {
102 + // Capacity < requested, can never satisfy
103 + return false;
104 + }
105 +
106 + // check if there's enough left
107 + Versioned<ContinuousResourceAllocation> allocation = consumers.get(resource.id());
108 + if (allocation == null) {
109 + // no allocation (=no consumer) full registered resources available
110 + return true;
111 + }
112 +
113 + return hasEnoughResource(allocation.value().original(), resource, allocation.value());
114 + }
115 +
116 + <T> Stream<ContinuousResource> getAllocatedResources(DiscreteResourceId parent, Class<T> cls) {
117 + Set<ContinuousResource> children = getChildResources(parent);
118 + if (children.isEmpty()) {
119 + return Stream.of();
120 + }
121 +
122 + return children.stream()
123 + .filter(x -> x.id().equals(parent.child(cls)))
124 + // we don't use cascading simple predicates like follows to reduce accesses to consistent map
125 + // .filter(x -> continuousConsumers.containsKey(x.id()))
126 + // .filter(x -> continuousConsumers.get(x.id()) != null)
127 + // .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
128 + .filter(resource -> {
129 + Versioned<ContinuousResourceAllocation> allocation = consumers.get(resource.id());
130 + if (allocation == null) {
131 + return false;
132 + }
133 + return !allocation.value().allocations().isEmpty();
134 + });
135 + }
136 +
137 + Stream<ContinuousResource> getResources(ResourceConsumer consumer) {
138 + return consumers.values().stream()
139 + .flatMap(x -> x.value().allocations().stream()
140 + .map(y -> Maps.immutableEntry(x.value().original(), y)))
141 + .filter(x -> x.getValue().consumer().equals(consumer))
142 + .map(x -> x.getKey());
143 + }
144 +}
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.resource.impl;
17 +
18 +import com.google.common.collect.ImmutableList;
19 +import com.google.common.collect.ImmutableSet;
20 +import org.onlab.util.Tools;
21 +import org.onosproject.net.resource.DiscreteResource;
22 +import org.onosproject.net.resource.DiscreteResourceId;
23 +import org.onosproject.net.resource.Resource;
24 +import org.onosproject.net.resource.ResourceAllocation;
25 +import org.onosproject.net.resource.ResourceConsumer;
26 +import org.onosproject.net.resource.Resources;
27 +import org.onosproject.store.service.ConsistentMap;
28 +import org.onosproject.store.service.ConsistentMapException;
29 +import org.onosproject.store.service.StorageService;
30 +import org.onosproject.store.service.TransactionContext;
31 +import org.onosproject.store.service.Versioned;
32 +
33 +import java.util.LinkedHashSet;
34 +import java.util.List;
35 +import java.util.Map;
36 +import java.util.Set;
37 +import java.util.stream.Stream;
38 +
39 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.MAX_RETRIES;
40 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.RETRY_DELAY;
41 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.SERIALIZER;
42 +
43 +class ConsistentDiscreteResourceStore {
44 + private ConsistentMap<DiscreteResourceId, ResourceConsumer> consumers;
45 + private ConsistentMap<DiscreteResourceId, Set<DiscreteResource>> childMap;
46 +
47 + ConsistentDiscreteResourceStore(StorageService service) {
48 + this.consumers = service.<DiscreteResourceId, ResourceConsumer>consistentMapBuilder()
49 + .withName(MapNames.DISCRETE_CONSUMER_MAP)
50 + .withSerializer(SERIALIZER)
51 + .build();
52 + this.childMap = service.<DiscreteResourceId, Set<DiscreteResource>>consistentMapBuilder()
53 + .withName(MapNames.DISCRETE_CHILD_MAP)
54 + .withSerializer(SERIALIZER)
55 + .build();
56 +
57 + Tools.retryable(() -> childMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
58 + ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
59 + }
60 +
61 + TransactionalDiscreteResourceStore transactional(TransactionContext tx) {
62 + return new TransactionalDiscreteResourceStore(tx);
63 + }
64 +
65 + // computational complexity: O(1)
66 + List<ResourceAllocation> getResourceAllocations(DiscreteResourceId resource) {
67 + Versioned<ResourceConsumer> consumer = consumers.get(resource);
68 + if (consumer == null) {
69 + return ImmutableList.of();
70 + }
71 +
72 + return ImmutableList.of(new ResourceAllocation(Resources.discrete(resource).resource(), consumer.value()));
73 + }
74 +
75 + Set<DiscreteResource> getChildResources(DiscreteResourceId parent) {
76 + Versioned<Set<DiscreteResource>> children = childMap.get(parent);
77 +
78 + if (children == null) {
79 + return ImmutableSet.of();
80 + }
81 +
82 + return children.value();
83 + }
84 +
85 + boolean isAvailable(DiscreteResource resource) {
86 + return getResourceAllocations(resource.id()).isEmpty();
87 + }
88 +
89 + <T> Stream<DiscreteResource> getAllocatedResources(DiscreteResourceId parent, Class<T> cls) {
90 + Set<DiscreteResource> children = getChildResources(parent);
91 + if (children.isEmpty()) {
92 + return Stream.of();
93 + }
94 +
95 + return children.stream()
96 + .filter(x -> x.isTypeOf(cls))
97 + .filter(x -> consumers.containsKey(x.id()));
98 + }
99 +
100 + Stream<DiscreteResource> getResources(ResourceConsumer consumer) {
101 + return consumers.entrySet().stream()
102 + .filter(x -> x.getValue().value().equals(consumer))
103 + .map(Map.Entry::getKey)
104 + .map(x -> Resources.discrete(x).resource());
105 + }
106 +}
...@@ -18,45 +18,35 @@ package org.onosproject.store.resource.impl; ...@@ -18,45 +18,35 @@ package org.onosproject.store.resource.impl;
18 import com.google.common.annotations.Beta; 18 import com.google.common.annotations.Beta;
19 import com.google.common.collect.ImmutableList; 19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableSet; 20 import com.google.common.collect.ImmutableSet;
21 -import com.google.common.collect.Maps;
22 -import com.google.common.collect.Sets;
23 -
24 import org.apache.felix.scr.annotations.Activate; 21 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component; 22 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Reference; 23 import org.apache.felix.scr.annotations.Reference;
27 import org.apache.felix.scr.annotations.ReferenceCardinality; 24 import org.apache.felix.scr.annotations.ReferenceCardinality;
28 import org.apache.felix.scr.annotations.Service; 25 import org.apache.felix.scr.annotations.Service;
29 -import org.onlab.util.GuavaCollectors;
30 -import org.onlab.util.Tools;
31 import org.onosproject.net.resource.ContinuousResource; 26 import org.onosproject.net.resource.ContinuousResource;
32 import org.onosproject.net.resource.ContinuousResourceId; 27 import org.onosproject.net.resource.ContinuousResourceId;
33 import org.onosproject.net.resource.DiscreteResource; 28 import org.onosproject.net.resource.DiscreteResource;
34 import org.onosproject.net.resource.DiscreteResourceId; 29 import org.onosproject.net.resource.DiscreteResourceId;
30 +import org.onosproject.net.resource.Resource;
35 import org.onosproject.net.resource.ResourceAllocation; 31 import org.onosproject.net.resource.ResourceAllocation;
36 import org.onosproject.net.resource.ResourceConsumer; 32 import org.onosproject.net.resource.ResourceConsumer;
37 import org.onosproject.net.resource.ResourceEvent; 33 import org.onosproject.net.resource.ResourceEvent;
38 import org.onosproject.net.resource.ResourceId; 34 import org.onosproject.net.resource.ResourceId;
39 -import org.onosproject.net.resource.Resource;
40 import org.onosproject.net.resource.ResourceStore; 35 import org.onosproject.net.resource.ResourceStore;
41 import org.onosproject.net.resource.ResourceStoreDelegate; 36 import org.onosproject.net.resource.ResourceStoreDelegate;
42 import org.onosproject.net.resource.Resources; 37 import org.onosproject.net.resource.Resources;
43 import org.onosproject.store.AbstractStore; 38 import org.onosproject.store.AbstractStore;
44 import org.onosproject.store.serializers.KryoNamespaces; 39 import org.onosproject.store.serializers.KryoNamespaces;
45 import org.onosproject.store.service.CommitStatus; 40 import org.onosproject.store.service.CommitStatus;
46 -import org.onosproject.store.service.ConsistentMap;
47 -import org.onosproject.store.service.ConsistentMapException;
48 import org.onosproject.store.service.Serializer; 41 import org.onosproject.store.service.Serializer;
49 import org.onosproject.store.service.StorageService; 42 import org.onosproject.store.service.StorageService;
50 import org.onosproject.store.service.TransactionContext; 43 import org.onosproject.store.service.TransactionContext;
51 -import org.onosproject.store.service.TransactionalMap;
52 -import org.onosproject.store.service.Versioned;
53 import org.slf4j.Logger; 44 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory; 45 import org.slf4j.LoggerFactory;
55 46
56 import java.util.Arrays; 47 import java.util.Arrays;
57 import java.util.Collection; 48 import java.util.Collection;
58 import java.util.LinkedHashMap; 49 import java.util.LinkedHashMap;
59 -import java.util.LinkedHashSet;
60 import java.util.List; 50 import java.util.List;
61 import java.util.Map; 51 import java.util.Map;
62 import java.util.Optional; 52 import java.util.Optional;
...@@ -66,7 +56,8 @@ import java.util.stream.Stream; ...@@ -66,7 +56,8 @@ import java.util.stream.Stream;
66 56
67 import static com.google.common.base.Preconditions.checkArgument; 57 import static com.google.common.base.Preconditions.checkArgument;
68 import static com.google.common.base.Preconditions.checkNotNull; 58 import static com.google.common.base.Preconditions.checkNotNull;
69 -import static org.onosproject.net.resource.ResourceEvent.Type.*; 59 +import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_ADDED;
60 +import static org.onosproject.net.resource.ResourceEvent.Type.RESOURCE_REMOVED;
70 61
71 /** 62 /**
72 * Implementation of ResourceStore using TransactionalMap. 63 * Implementation of ResourceStore using TransactionalMap.
...@@ -78,49 +69,25 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -78,49 +69,25 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
78 implements ResourceStore { 69 implements ResourceStore {
79 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class); 70 private static final Logger log = LoggerFactory.getLogger(ConsistentResourceStore.class);
80 71
81 - private static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers"; 72 + static final Serializer SERIALIZER = Serializer.using(
82 - private static final String DISCRETE_CHILD_MAP = "onos-resource-discrete-children";
83 - private static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
84 - private static final String CONTINUOUS_CHILD_MAP = "onos-resource-continuous-children";
85 - private static final Serializer SERIALIZER = Serializer.using(
86 Arrays.asList(KryoNamespaces.API), 73 Arrays.asList(KryoNamespaces.API),
87 ContinuousResourceAllocation.class); 74 ContinuousResourceAllocation.class);
88 75
89 // TODO: We should provide centralized values for this 76 // TODO: We should provide centralized values for this
90 - private static final int MAX_RETRIES = 5; 77 + static final int MAX_RETRIES = 5;
91 - private static final int RETRY_DELAY = 1_000; // millis 78 + static final int RETRY_DELAY = 1_000; // millis
92 79
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 80 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected StorageService service; 81 protected StorageService service;
95 82
96 - private ConsistentMap<DiscreteResourceId, ResourceConsumer> discreteConsumers; 83 + private ConsistentDiscreteResourceStore discreteStore;
97 - private ConsistentMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildMap; 84 + private ConsistentContinuousResourceStore continuousStore;
98 - private ConsistentMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumers;
99 - private ConsistentMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildMap;
100 85
101 @Activate 86 @Activate
102 public void activate() { 87 public void activate() {
103 - discreteConsumers = service.<DiscreteResourceId, ResourceConsumer>consistentMapBuilder() 88 + discreteStore = new ConsistentDiscreteResourceStore(service);
104 - .withName(DISCRETE_CONSUMER_MAP) 89 + continuousStore = new ConsistentContinuousResourceStore(service);
105 - .withSerializer(SERIALIZER)
106 - .build();
107 - discreteChildMap = service.<DiscreteResourceId, Set<DiscreteResource>>consistentMapBuilder()
108 - .withName(DISCRETE_CHILD_MAP)
109 - .withSerializer(SERIALIZER)
110 - .build();
111 - continuousConsumers = service.<ContinuousResourceId, ContinuousResourceAllocation>consistentMapBuilder()
112 - .withName(CONTINUOUS_CONSUMER_MAP)
113 - .withSerializer(SERIALIZER)
114 - .build();
115 - continuousChildMap = service.<DiscreteResourceId, Set<ContinuousResource>>consistentMapBuilder()
116 - .withName(CONTINUOUS_CHILD_MAP)
117 - .withSerializer(SERIALIZER)
118 - .build();
119 90
120 - Tools.retryable(() -> discreteChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
121 - ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
122 - Tools.retryable(() -> continuousChildMap.put(Resource.ROOT.id(), new LinkedHashSet<>()),
123 - ConsistentMapException.class, MAX_RETRIES, RETRY_DELAY);
124 log.info("Started"); 91 log.info("Started");
125 } 92 }
126 93
...@@ -132,32 +99,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -132,32 +99,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
132 checkArgument(id instanceof DiscreteResourceId || id instanceof ContinuousResourceId); 99 checkArgument(id instanceof DiscreteResourceId || id instanceof ContinuousResourceId);
133 100
134 if (id instanceof DiscreteResourceId) { 101 if (id instanceof DiscreteResourceId) {
135 - return getResourceAllocations((DiscreteResourceId) id); 102 + return discreteStore.getResourceAllocations((DiscreteResourceId) id);
136 } else { 103 } else {
137 - return getResourceAllocations((ContinuousResourceId) id); 104 + return continuousStore.getResourceAllocations((ContinuousResourceId) id);
138 - }
139 - }
140 -
141 - // computational complexity: O(1)
142 - private List<ResourceAllocation> getResourceAllocations(DiscreteResourceId resource) {
143 - Versioned<ResourceConsumer> consumer = discreteConsumers.get(resource);
144 - if (consumer == null) {
145 - return ImmutableList.of();
146 } 105 }
147 -
148 - return ImmutableList.of(new ResourceAllocation(Resources.discrete(resource).resource(), consumer.value()));
149 - }
150 -
151 - // computational complexity: O(n) where n is the number of the existing allocations for the resource
152 - private List<ResourceAllocation> getResourceAllocations(ContinuousResourceId resource) {
153 - Versioned<ContinuousResourceAllocation> allocations = continuousConsumers.get(resource);
154 - if (allocations == null) {
155 - return ImmutableList.of();
156 - }
157 -
158 - return allocations.value().allocations().stream()
159 - .filter(x -> x.resource().id().equals(resource))
160 - .collect(GuavaCollectors.toImmutableList());
161 } 106 }
162 107
163 @Override 108 @Override
...@@ -170,22 +115,19 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -170,22 +115,19 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
170 TransactionContext tx = service.transactionContextBuilder().build(); 115 TransactionContext tx = service.transactionContextBuilder().build();
171 tx.begin(); 116 tx.begin();
172 117
173 - TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
174 - tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
175 - TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
176 - tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
177 -
178 // the order is preserved by LinkedHashMap 118 // the order is preserved by LinkedHashMap
179 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream() 119 Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
180 .filter(x -> x.parent().isPresent()) 120 .filter(x -> x.parent().isPresent())
181 .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList())); 121 .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
182 122
183 - for (Map.Entry<DiscreteResource, List<Resource>> entry: resourceMap.entrySet()) { 123 + TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
184 - if (!lookup(discreteChildTxMap, continuousChildTxMap, entry.getKey().id()).isPresent()) { 124 + TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
125 + for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
126 + if (!lookup(discreteTxStore, continuousTxStore, entry.getKey().id()).isPresent()) {
185 return abortTransaction(tx); 127 return abortTransaction(tx);
186 } 128 }
187 129
188 - if (!appendValues(discreteChildTxMap, continuousChildTxMap, entry.getKey().id(), entry.getValue())) { 130 + if (!appendValues(discreteTxStore, continuousTxStore, entry.getKey().id(), entry.getValue())) {
189 return abortTransaction(tx); 131 return abortTransaction(tx);
190 } 132 }
191 } 133 }
...@@ -211,15 +153,8 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -211,15 +153,8 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
211 TransactionContext tx = service.transactionContextBuilder().build(); 153 TransactionContext tx = service.transactionContextBuilder().build();
212 tx.begin(); 154 tx.begin();
213 155
214 - TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = 156 + TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
215 - tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); 157 + TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
216 - TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap =
217 - tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
218 - TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
219 - tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
220 - TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
221 - tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
222 -
223 // Look up resources by resource IDs 158 // Look up resources by resource IDs
224 List<Resource> resources = ids.stream() 159 List<Resource> resources = ids.stream()
225 .filter(x -> x.parent().isPresent()) 160 .filter(x -> x.parent().isPresent())
...@@ -228,7 +163,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -228,7 +163,7 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
228 if (x instanceof DiscreteResourceId) { 163 if (x instanceof DiscreteResourceId) {
229 return Optional.of(Resources.discrete((DiscreteResourceId) x).resource()); 164 return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
230 } else { 165 } else {
231 - return lookup(continuousChildTxMap, (ContinuousResourceId) x); 166 + return continuousTxStore.lookup((ContinuousResourceId) x);
232 } 167 }
233 }) 168 })
234 .filter(Optional::isPresent) 169 .filter(Optional::isPresent)
...@@ -240,14 +175,12 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -240,14 +175,12 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
240 175
241 // even if one of the resources is allocated to a consumer, 176 // even if one of the resources is allocated to a consumer,
242 // all unregistrations are regarded as failure 177 // all unregistrations are regarded as failure
243 - for (Map.Entry<DiscreteResourceId, List<Resource>> entry: resourceMap.entrySet()) { 178 + for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
244 boolean allocated = entry.getValue().stream().anyMatch(x -> { 179 boolean allocated = entry.getValue().stream().anyMatch(x -> {
245 if (x instanceof DiscreteResource) { 180 if (x instanceof DiscreteResource) {
246 - return discreteConsumerTxMap.get(((DiscreteResource) x).id()) != null; 181 + return discreteTxStore.isAllocated(((DiscreteResource) x).id());
247 } else if (x instanceof ContinuousResource) { 182 } else if (x instanceof ContinuousResource) {
248 - ContinuousResourceAllocation allocations = 183 + return continuousTxStore.isAllocated(((ContinuousResource) x).id());
249 - continuousConsumerTxMap.get(((ContinuousResource) x).id());
250 - return allocations != null && !allocations.allocations().isEmpty();
251 } else { 184 } else {
252 return false; 185 return false;
253 } 186 }
...@@ -257,11 +190,11 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -257,11 +190,11 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
257 return abortTransaction(tx); 190 return abortTransaction(tx);
258 } 191 }
259 192
260 - if (!removeValues(discreteChildTxMap, continuousChildTxMap, entry.getKey(), entry.getValue())) { 193 + if (!removeValues(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
261 log.warn("Failed to unregister {}: Failed to remove {} values.", 194 log.warn("Failed to unregister {}: Failed to remove {} values.",
262 - entry.getKey(), entry.getValue().size()); 195 + entry.getKey(), entry.getValue().size());
263 log.debug("Failed to unregister {}: Failed to remove values: {}", 196 log.debug("Failed to unregister {}: Failed to remove values: {}",
264 - entry.getKey(), entry.getValue()); 197 + entry.getKey(), entry.getValue());
265 return abortTransaction(tx); 198 return abortTransaction(tx);
266 } 199 }
267 } 200 }
...@@ -287,38 +220,15 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -287,38 +220,15 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
287 TransactionContext tx = service.transactionContextBuilder().build(); 220 TransactionContext tx = service.transactionContextBuilder().build();
288 tx.begin(); 221 tx.begin();
289 222
290 - TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = 223 + TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
291 - tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); 224 + TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
292 - TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteChildTxMap = 225 + for (Resource resource : resources) {
293 - tx.getTransactionalMap(DISCRETE_CHILD_MAP, SERIALIZER);
294 - TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
295 - tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
296 - TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousChildTxMap =
297 - tx.getTransactionalMap(CONTINUOUS_CHILD_MAP, SERIALIZER);
298 -
299 - for (Resource resource: resources) {
300 - // if the resource is not registered, then abort
301 - Optional<Resource> lookedUp = lookup(discreteChildTxMap, continuousChildTxMap, resource.id());
302 - if (!lookedUp.isPresent()) {
303 - return abortTransaction(tx);
304 - }
305 -
306 if (resource instanceof DiscreteResource) { 226 if (resource instanceof DiscreteResource) {
307 - ResourceConsumer oldValue = discreteConsumerTxMap.put(((DiscreteResource) resource).id(), consumer); 227 + if (!discreteTxStore.allocate(consumer, (DiscreteResource) resource)) {
308 - if (oldValue != null) {
309 return abortTransaction(tx); 228 return abortTransaction(tx);
310 } 229 }
311 } else if (resource instanceof ContinuousResource) { 230 } else if (resource instanceof ContinuousResource) {
312 - // Down cast: this must be safe as ContinuousResource is associated with ContinuousResourceId 231 + if (!continuousTxStore.allocate(consumer, (ContinuousResource) resource)) {
313 - ContinuousResource continuous = (ContinuousResource) lookedUp.get();
314 - ContinuousResourceAllocation allocations = continuousConsumerTxMap.get(continuous.id());
315 - if (!hasEnoughResource(continuous, (ContinuousResource) resource, allocations)) {
316 - return abortTransaction(tx);
317 - }
318 -
319 - boolean success = appendValue(continuousConsumerTxMap,
320 - continuous, new ResourceAllocation(resource, consumer));
321 - if (!success) {
322 return abortTransaction(tx); 232 return abortTransaction(tx);
323 } 233 }
324 } 234 }
...@@ -334,31 +244,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -334,31 +244,18 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
334 TransactionContext tx = service.transactionContextBuilder().build(); 244 TransactionContext tx = service.transactionContextBuilder().build();
335 tx.begin(); 245 tx.begin();
336 246
337 - TransactionalMap<DiscreteResourceId, ResourceConsumer> discreteConsumerTxMap = 247 + TransactionalDiscreteResourceStore discreteTxStore = discreteStore.transactional(tx);
338 - tx.getTransactionalMap(DISCRETE_CONSUMER_MAP, SERIALIZER); 248 + TransactionalContinuousResourceStore continuousTxStore = continuousStore.transactional(tx);
339 - TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> continuousConsumerTxMap =
340 - tx.getTransactionalMap(CONTINUOUS_CONSUMER_MAP, SERIALIZER);
341 -
342 for (ResourceAllocation allocation : allocations) { 249 for (ResourceAllocation allocation : allocations) {
343 Resource resource = allocation.resource(); 250 Resource resource = allocation.resource();
344 ResourceConsumer consumer = allocation.consumer(); 251 ResourceConsumer consumer = allocation.consumer();
345 252
346 if (resource instanceof DiscreteResource) { 253 if (resource instanceof DiscreteResource) {
347 - // if this single release fails (because the resource is allocated to another consumer, 254 + if (!discreteTxStore.release((DiscreteResource) resource, consumer)) {
348 - // the whole release fails
349 - if (!discreteConsumerTxMap.remove(((DiscreteResource) resource).id(), consumer)) {
350 return abortTransaction(tx); 255 return abortTransaction(tx);
351 } 256 }
352 } else if (resource instanceof ContinuousResource) { 257 } else if (resource instanceof ContinuousResource) {
353 - ContinuousResource continuous = (ContinuousResource) resource; 258 + if (!continuousTxStore.release((ContinuousResource) resource, consumer)) {
354 - ContinuousResourceAllocation continuousAllocation = continuousConsumerTxMap.get(continuous.id());
355 - ImmutableList<ResourceAllocation> newAllocations = continuousAllocation.allocations().stream()
356 - .filter(x -> !(x.consumer().equals(consumer) &&
357 - ((ContinuousResource) x.resource()).value() == continuous.value()))
358 - .collect(GuavaCollectors.toImmutableList());
359 -
360 - if (!continuousConsumerTxMap.replace(continuous.id(), continuousAllocation,
361 - new ContinuousResourceAllocation(continuousAllocation.original(), newAllocations))) {
362 return abortTransaction(tx); 259 return abortTransaction(tx);
363 } 260 }
364 } 261 }
...@@ -377,37 +274,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -377,37 +274,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
377 274
378 if (resource instanceof DiscreteResource) { 275 if (resource instanceof DiscreteResource) {
379 // check if already consumed 276 // check if already consumed
380 - return getResourceAllocations(resource.id()).isEmpty(); 277 + return discreteStore.isAvailable((DiscreteResource) resource);
381 } else { 278 } else {
382 - return isAvailable((ContinuousResource) resource); 279 + return continuousStore.isAvailable((ContinuousResource) resource);
383 - }
384 - }
385 -
386 - // computational complexity: O(n) where n is the number of existing allocations for the resource
387 - private boolean isAvailable(ContinuousResource resource) {
388 - // check if it's registered or not.
389 - Versioned<Set<ContinuousResource>> children = continuousChildMap.get(resource.parent().get().id());
390 - if (children == null) {
391 - return false;
392 - }
393 -
394 - ContinuousResource registered = children.value().stream()
395 - .filter(c -> c.id().equals(resource.id()))
396 - .findFirst()
397 - .get();
398 - if (registered.value() < resource.value()) {
399 - // Capacity < requested, can never satisfy
400 - return false;
401 - }
402 -
403 - // check if there's enough left
404 - Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
405 - if (allocation == null) {
406 - // no allocation (=no consumer) full registered resources available
407 - return true;
408 } 280 }
409 -
410 - return hasEnoughResource(allocation.value().original(), resource, allocation.value());
411 } 281 }
412 282
413 // computational complexity: O(n + m) where n is the number of entries in discreteConsumers 283 // computational complexity: O(n + m) where n is the number of entries in discreteConsumers
...@@ -418,18 +288,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -418,18 +288,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
418 288
419 // NOTE: getting all entries may become performance bottleneck 289 // NOTE: getting all entries may become performance bottleneck
420 // TODO: revisit for better backend data structure 290 // TODO: revisit for better backend data structure
421 - Stream<DiscreteResource> discreteStream = discreteConsumers.entrySet().stream() 291 + Stream<DiscreteResource> discrete = discreteStore.getResources(consumer);
422 - .filter(x -> x.getValue().value().equals(consumer)) 292 + Stream<ContinuousResource> continuous = continuousStore.getResources(consumer);
423 - .map(Map.Entry::getKey) 293 +
424 - .map(x -> Resources.discrete(x).resource()); 294 + return Stream.concat(discrete, continuous).collect(Collectors.toList());
425 -
426 - Stream<ContinuousResource> continuousStream = continuousConsumers.values().stream()
427 - .flatMap(x -> x.value().allocations().stream()
428 - .map(y -> Maps.immutableEntry(x.value().original(), y)))
429 - .filter(x -> x.getValue().consumer().equals(consumer))
430 - .map(x -> x.getKey());
431 -
432 - return Stream.concat(discreteStream, continuousStream).collect(Collectors.toList());
433 } 295 }
434 296
435 // computational complexity: O(1) 297 // computational complexity: O(1)
...@@ -437,21 +299,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -437,21 +299,10 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
437 public Set<Resource> getChildResources(DiscreteResourceId parent) { 299 public Set<Resource> getChildResources(DiscreteResourceId parent) {
438 checkNotNull(parent); 300 checkNotNull(parent);
439 301
440 - Versioned<Set<DiscreteResource>> discreteChildren = discreteChildMap.get(parent); 302 + return ImmutableSet.<Resource>builder()
441 - Versioned<Set<ContinuousResource>> continuousChildren = continuousChildMap.get(parent); 303 + .addAll(discreteStore.getChildResources(parent))
442 - 304 + .addAll(continuousStore.getChildResources(parent))
443 - if (discreteChildren == null && continuousChildren == null) { 305 + .build();
444 - return ImmutableSet.of();
445 - } else if (discreteChildren == null) {
446 - return ImmutableSet.copyOf(continuousChildren.value());
447 - } else if (continuousChildren == null) {
448 - return ImmutableSet.copyOf(discreteChildren.value());
449 - } else {
450 - return ImmutableSet.<Resource>builder()
451 - .addAll(discreteChildren.value())
452 - .addAll(continuousChildren.value())
453 - .build();
454 - }
455 } 306 }
456 307
457 // computational complexity: O(n) where n is the number of the children of the parent 308 // computational complexity: O(n) where n is the number of the children of the parent
...@@ -465,27 +316,8 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -465,27 +316,8 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
465 return children; 316 return children;
466 } 317 }
467 318
468 - Stream<DiscreteResource> discrete = children.stream() 319 + Stream<DiscreteResource> discrete = discreteStore.getAllocatedResources(parent, cls);
469 - .filter(x -> x.isTypeOf(cls)) 320 + Stream<ContinuousResource> continuous = continuousStore.getAllocatedResources(parent, cls);
470 - .filter(x -> x instanceof DiscreteResource)
471 - .map(x -> ((DiscreteResource) x))
472 - .filter(x -> discreteConsumers.containsKey(x.id()));
473 -
474 - Stream<ContinuousResource> continuous = children.stream()
475 - .filter(x -> x.id().equals(parent.child(cls)))
476 - .filter(x -> x instanceof ContinuousResource)
477 - .map(x -> (ContinuousResource) x)
478 - // we don't use cascading simple predicates like follows to reduce accesses to consistent map
479 - // .filter(x -> continuousConsumers.containsKey(x.id()))
480 - // .filter(x -> continuousConsumers.get(x.id()) != null)
481 - // .filter(x -> !continuousConsumers.get(x.id()).value().allocations().isEmpty());
482 - .filter(resource -> {
483 - Versioned<ContinuousResourceAllocation> allocation = continuousConsumers.get(resource.id());
484 - if (allocation == null) {
485 - return false;
486 - }
487 - return !allocation.value().allocations().isEmpty();
488 - });
489 321
490 return Stream.concat(discrete, continuous).collect(Collectors.toList()); 322 return Stream.concat(discrete, continuous).collect(Collectors.toList());
491 } 323 }
...@@ -501,41 +333,17 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -501,41 +333,17 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
501 return false; 333 return false;
502 } 334 }
503 335
504 - // Appends the specified ResourceAllocation to the existing values stored in the map
505 - // computational complexity: O(n) where n is the number of the elements in the associated allocation
506 - private boolean appendValue(TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> map,
507 - ContinuousResource original, ResourceAllocation value) {
508 - ContinuousResourceAllocation oldValue = map.putIfAbsent(original.id(),
509 - new ContinuousResourceAllocation(original, ImmutableList.of(value)));
510 - if (oldValue == null) {
511 - return true;
512 - }
513 -
514 - if (oldValue.allocations().contains(value)) {
515 - // don't write to map because all values are already stored
516 - return true;
517 - }
518 -
519 - ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
520 - ImmutableList.<ResourceAllocation>builder()
521 - .addAll(oldValue.allocations())
522 - .add(value)
523 - .build());
524 - return map.replace(original.id(), oldValue, newValue);
525 - }
526 /** 336 /**
527 * Appends the values to the existing values associated with the specified key. 337 * Appends the values to the existing values associated with the specified key.
528 * If the map already has all the given values, appending will not happen. 338 * If the map already has all the given values, appending will not happen.
529 * 339 *
530 - * @param discreteTxMap map holding multiple discrete resources for a key 340 + * @param key key specifying values
531 - * @param continuousTxMap map holding multiple continuous resources for a key
532 - * @param key key specifying values
533 * @param values values to be appended 341 * @param values values to be appended
534 * @return true if the operation succeeds, false otherwise. 342 * @return true if the operation succeeds, false otherwise.
535 */ 343 */
536 // computational complexity: O(n) where n is the number of the specified value 344 // computational complexity: O(n) where n is the number of the specified value
537 - private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, 345 + private boolean appendValues(TransactionalDiscreteResourceStore discreteTxStore,
538 - TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, 346 + TransactionalContinuousResourceStore continuousTxStore,
539 DiscreteResourceId key, List<Resource> values) { 347 DiscreteResourceId key, List<Resource> values) {
540 // it's assumed that the passed "values" is non-empty 348 // it's assumed that the passed "values" is non-empty
541 349
...@@ -551,77 +359,28 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -551,77 +359,28 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
551 359
552 // short-circuit decision avoiding unnecessary distributed map operations 360 // short-circuit decision avoiding unnecessary distributed map operations
553 if (continuousValues.isEmpty()) { 361 if (continuousValues.isEmpty()) {
554 - return appendValues(discreteTxMap, key, discreteValues, null); 362 + return discreteTxStore.appendValues(key, discreteValues);
555 } 363 }
556 if (discreteValues.isEmpty()) { 364 if (discreteValues.isEmpty()) {
557 - return appendValues(continuousTxMap, key, continuousValues, null); 365 + return continuousTxStore.appendValues(key, continuousValues);
558 - }
559 -
560 - return appendValues(discreteTxMap, key, discreteValues, null)
561 - && appendValues(continuousTxMap, key, continuousValues, null);
562 - }
563 -
564 - private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> map,
565 - DiscreteResourceId key, List<DiscreteResource> values, DiscreteResource dummy) {
566 - Set<DiscreteResource> requested = new LinkedHashSet<>(values);
567 - Set<DiscreteResource> oldValues = map.putIfAbsent(key, requested);
568 - if (oldValues == null) {
569 - return true;
570 - }
571 -
572 - Set<DiscreteResource> addedValues = Sets.difference(requested, oldValues);
573 - // no new value, then no-op
574 - if (addedValues.isEmpty()) {
575 - // don't write to map because all values are already stored
576 - return true;
577 } 366 }
578 367
579 - Set<DiscreteResource> newValues = new LinkedHashSet<>(oldValues); 368 + return discreteTxStore.appendValues(key, discreteValues)
580 - newValues.addAll(addedValues); 369 + && continuousTxStore.appendValues(key, continuousValues);
581 - return map.replace(key, oldValues, newValues);
582 - }
583 -
584 - private boolean appendValues(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> map,
585 - DiscreteResourceId key, List<ContinuousResource> values, ContinuousResource dummy) {
586 - Set<ContinuousResource> requested = new LinkedHashSet<>(values);
587 - Set<ContinuousResource> oldValues = map.putIfAbsent(key, requested);
588 - if (oldValues == null) {
589 - return true;
590 - }
591 -
592 - Set<ContinuousResource> addedValues = Sets.difference(requested, oldValues);
593 - // no new value, then no-op
594 - if (addedValues.isEmpty()) {
595 - // don't write to map because all values are already stored
596 - return true;
597 - }
598 -
599 - Set<ContinuousResourceId> addedIds = addedValues.stream()
600 - .map(ContinuousResource::id)
601 - .collect(Collectors.toSet());
602 - // if the value is not found but the same ID is found
603 - // (this happens only when being continuous resource)
604 - if (oldValues.stream().anyMatch(x -> addedIds.contains(x.id()))) {
605 - // no-op, but indicating failure (reject the request)
606 - return false;
607 - }
608 - Set<ContinuousResource> newValues = new LinkedHashSet<>(oldValues);
609 - newValues.addAll(addedValues);
610 - return map.replace(key, oldValues, newValues);
611 } 370 }
612 371
613 /** 372 /**
614 * Removes the values from the existing values associated with the specified key. 373 * Removes the values from the existing values associated with the specified key.
615 * If the map doesn't contain the given values, removal will not happen. 374 * If the map doesn't contain the given values, removal will not happen.
616 * 375 *
617 - * @param discreteTxMap map holding multiple discrete resources for a key 376 + * @param discreteTxStore map holding multiple discrete resources for a key
618 - * @param continuousTxMap map holding multiple continuous resources for a key 377 + * @param continuousTxStore map holding multiple continuous resources for a key
619 - * @param key key specifying values 378 + * @param key key specifying values
620 - * @param values values to be removed 379 + * @param values values to be removed
621 * @return true if the operation succeeds, false otherwise 380 * @return true if the operation succeeds, false otherwise
622 */ 381 */
623 - private boolean removeValues(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, 382 + private boolean removeValues(TransactionalDiscreteResourceStore discreteTxStore,
624 - TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, 383 + TransactionalContinuousResourceStore continuousTxStore,
625 DiscreteResourceId key, List<Resource> values) { 384 DiscreteResourceId key, List<Resource> values) {
626 // it's assumed that the passed "values" is non-empty 385 // it's assumed that the passed "values" is non-empty
627 386
...@@ -637,138 +396,54 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour ...@@ -637,138 +396,54 @@ public class ConsistentResourceStore extends AbstractStore<ResourceEvent, Resour
637 396
638 // short-circuit decision avoiding unnecessary distributed map operations 397 // short-circuit decision avoiding unnecessary distributed map operations
639 if (continuousValues.isEmpty()) { 398 if (continuousValues.isEmpty()) {
640 - return removeValues(discreteTxMap, key, discreteValues); 399 + return discreteTxStore.removeValues(key, discreteValues);
641 } 400 }
642 if (discreteValues.isEmpty()) { 401 if (discreteValues.isEmpty()) {
643 - return removeValues(continuousTxMap, key, continuousValues); 402 + return continuousTxStore.removeValues(key, continuousValues);
644 } 403 }
645 404
646 - return removeValues(discreteTxMap, key, discreteValues) && removeValues(continuousTxMap, key, continuousValues); 405 + return discreteTxStore.removeValues(key, discreteValues)
406 + && continuousTxStore.removeValues(key, continuousValues);
647 } 407 }
648 408
649 - private <T extends Resource> boolean removeValues(TransactionalMap<DiscreteResourceId, Set<T>> map,
650 - DiscreteResourceId key, List<T> values) {
651 - Set<T> oldValues = map.putIfAbsent(key, new LinkedHashSet<>());
652 - if (oldValues == null) {
653 - log.trace("No-Op removing values. key {} did not exist", key);
654 - return true;
655 - }
656 -
657 - if (values.stream().allMatch(x -> !oldValues.contains(x))) {
658 - // don't write map because none of the values are stored
659 - log.trace("No-Op removing values. key {} did not contain {}", key, values);
660 - return true;
661 - }
662 -
663 - LinkedHashSet<T> newValues = new LinkedHashSet<>(oldValues);
664 - newValues.removeAll(values);
665 - return map.replace(key, oldValues, newValues);
666 -
667 - }
668 /** 409 /**
669 * Returns the resource which has the same key as the specified resource ID 410 * Returns the resource which has the same key as the specified resource ID
670 * in the set as a value of the map. 411 * in the set as a value of the map.
671 * 412 *
672 - * @param discreteTxMap map storing parent - child relationship of discrete resources
673 - * @param continuousTxMap map storing parent -child relationship of continuous resources
674 * @param id ID of resource to be checked 413 * @param id ID of resource to be checked
675 * @return the resource which is regarded as the same as the specified resource 414 * @return the resource which is regarded as the same as the specified resource
676 */ 415 */
677 // Naive implementation, which traverses all elements in the set when continuous resource 416 // Naive implementation, which traverses all elements in the set when continuous resource
678 // computational complexity: O(1) when discrete resource. O(n) when continuous resource 417 // computational complexity: O(1) when discrete resource. O(n) when continuous resource
679 // where n is the number of elements in the associated set 418 // where n is the number of elements in the associated set
680 - private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap, 419 + private Optional<Resource> lookup(TransactionalDiscreteResourceStore discreteTxStore,
681 - TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap, 420 + TransactionalContinuousResourceStore continuousTxStore,
682 ResourceId id) { 421 ResourceId id) {
683 if (id instanceof DiscreteResourceId) { 422 if (id instanceof DiscreteResourceId) {
684 - return lookup(discreteTxMap, (DiscreteResourceId) id); 423 + return discreteTxStore.lookup((DiscreteResourceId) id);
685 } else if (id instanceof ContinuousResourceId) { 424 } else if (id instanceof ContinuousResourceId) {
686 - return lookup(continuousTxMap, (ContinuousResourceId) id); 425 + return continuousTxStore.lookup((ContinuousResourceId) id);
687 - } else {
688 - return Optional.empty();
689 - }
690 - }
691 -
692 - // check the existence in the set: O(1) operation
693 - private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> discreteTxMap,
694 - DiscreteResourceId id) {
695 - if (!id.parent().isPresent()) {
696 - return Optional.of(Resource.ROOT);
697 - }
698 -
699 - Set<DiscreteResource> values = discreteTxMap.get(id.parent().get());
700 - if (values == null) {
701 - return Optional.empty();
702 - }
703 -
704 - DiscreteResource resource = Resources.discrete(id).resource();
705 - if (values.contains(resource)) {
706 - return Optional.of(resource);
707 } else { 426 } else {
708 return Optional.empty(); 427 return Optional.empty();
709 } 428 }
710 } 429 }
711 430
712 - // iterate over the values in the set: O(n) operation
713 - private Optional<Resource> lookup(TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> continuousTxMap,
714 - ContinuousResourceId id) {
715 - if (!id.parent().isPresent()) {
716 - return Optional.of(Resource.ROOT);
717 - }
718 -
719 - Set<ContinuousResource> values = continuousTxMap.get(id.parent().get());
720 - if (values == null) {
721 - return Optional.empty();
722 - }
723 -
724 - return values.stream()
725 - .filter(x -> x.id().equals(id))
726 - .map(x -> (Resource) x)
727 - .findFirst();
728 - }
729 -
730 - /**
731 - * Checks if there is enough resource volume to allocated the requested resource
732 - * against the specified resource.
733 - *
734 - * @param original original resource
735 - * @param request requested resource
736 - * @param allocation current allocation of the resource
737 - * @return true if there is enough resource volume. Otherwise, false.
738 - */
739 - // computational complexity: O(n) where n is the number of allocations
740 - private boolean hasEnoughResource(ContinuousResource original,
741 - ContinuousResource request,
742 - ContinuousResourceAllocation allocation) {
743 - if (allocation == null) {
744 - return request.value() <= original.value();
745 - }
746 -
747 - double allocated = allocation.allocations().stream()
748 - .filter(x -> x.resource() instanceof ContinuousResource)
749 - .map(x -> (ContinuousResource) x.resource())
750 - .mapToDouble(ContinuousResource::value)
751 - .sum();
752 - double left = original.value() - allocated;
753 - return request.value() <= left;
754 - }
755 -
756 // internal use only 431 // internal use only
757 - private static final class ContinuousResourceAllocation { 432 + static final class ContinuousResourceAllocation {
758 private final ContinuousResource original; 433 private final ContinuousResource original;
759 private final ImmutableList<ResourceAllocation> allocations; 434 private final ImmutableList<ResourceAllocation> allocations;
760 435
761 - private ContinuousResourceAllocation(ContinuousResource original, 436 + ContinuousResourceAllocation(ContinuousResource original,
762 - ImmutableList<ResourceAllocation> allocations) { 437 + ImmutableList<ResourceAllocation> allocations) {
763 this.original = original; 438 this.original = original;
764 this.allocations = allocations; 439 this.allocations = allocations;
765 } 440 }
766 441
767 - private ContinuousResource original() { 442 + ContinuousResource original() {
768 return original; 443 return original;
769 } 444 }
770 445
771 - private ImmutableList<ResourceAllocation> allocations() { 446 + ImmutableList<ResourceAllocation> allocations() {
772 return allocations; 447 return allocations;
773 } 448 }
774 } 449 }
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.resource.impl;
17 +
18 +final class MapNames {
19 + static final String DISCRETE_CONSUMER_MAP = "onos-discrete-consumers";
20 + static final String DISCRETE_CHILD_MAP = "onos-resource-discrete-children";
21 + static final String CONTINUOUS_CONSUMER_MAP = "onos-continuous-consumers";
22 + static final String CONTINUOUS_CHILD_MAP = "onos-resource-continuous-children";
23 +
24 + // prohibit contruction
25 + private MapNames() {}
26 +}
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.resource.impl;
17 +
18 +import org.onosproject.net.resource.ContinuousResource;
19 +
20 +final class ResourceStoreUtil {
21 + // prohibit construction
22 + private ResourceStoreUtil() {}
23 +
24 + /**
25 + * Checks if there is enough resource volume to allocated the requested resource
26 + * against the specified resource.
27 + *
28 + * @param original original resource
29 + * @param request requested resource
30 + * @param allocation current allocation of the resource
31 + * @return true if there is enough resource volume. Otherwise, false.
32 + */
33 + // computational complexity: O(n) where n is the number of allocations
34 + static boolean hasEnoughResource(ContinuousResource original,
35 + ContinuousResource request,
36 + ConsistentResourceStore.ContinuousResourceAllocation allocation) {
37 + if (allocation == null) {
38 + return request.value() <= original.value();
39 + }
40 +
41 + double allocated = allocation.allocations().stream()
42 + .filter(x -> x.resource() instanceof ContinuousResource)
43 + .map(x -> (ContinuousResource) x.resource())
44 + .mapToDouble(ContinuousResource::value)
45 + .sum();
46 + double left = original.value() - allocated;
47 + return request.value() <= left;
48 + }
49 +}
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.resource.impl;
17 +
18 +import com.google.common.collect.ImmutableList;
19 +import com.google.common.collect.Sets;
20 +import org.onlab.util.GuavaCollectors;
21 +import org.onosproject.net.resource.ContinuousResource;
22 +import org.onosproject.net.resource.ContinuousResourceId;
23 +import org.onosproject.net.resource.DiscreteResourceId;
24 +import org.onosproject.net.resource.Resource;
25 +import org.onosproject.net.resource.ResourceAllocation;
26 +import org.onosproject.net.resource.ResourceConsumer;
27 +import org.onosproject.store.resource.impl.ConsistentResourceStore.ContinuousResourceAllocation;
28 +import org.onosproject.store.service.TransactionContext;
29 +import org.onosproject.store.service.TransactionalMap;
30 +import org.slf4j.Logger;
31 +import org.slf4j.LoggerFactory;
32 +
33 +import java.util.LinkedHashSet;
34 +import java.util.List;
35 +import java.util.Optional;
36 +import java.util.Set;
37 +import java.util.stream.Collectors;
38 +
39 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.SERIALIZER;
40 +import static org.onosproject.store.resource.impl.ResourceStoreUtil.hasEnoughResource;
41 +
42 +class TransactionalContinuousResourceStore {
43 + private final Logger log = LoggerFactory.getLogger(getClass());
44 + private final TransactionalMap<DiscreteResourceId, Set<ContinuousResource>> childMap;
45 + private final TransactionalMap<ContinuousResourceId, ContinuousResourceAllocation> consumers;
46 +
47 + TransactionalContinuousResourceStore(TransactionContext tx) {
48 + this.childMap = tx.getTransactionalMap(MapNames.CONTINUOUS_CHILD_MAP, SERIALIZER);
49 + this.consumers = tx.getTransactionalMap(MapNames.CONTINUOUS_CONSUMER_MAP, SERIALIZER);
50 + }
51 +
52 + // iterate over the values in the set: O(n) operation
53 + Optional<Resource> lookup(ContinuousResourceId id) {
54 + if (!id.parent().isPresent()) {
55 + return Optional.of(Resource.ROOT);
56 + }
57 +
58 + Set<ContinuousResource> values = childMap.get(id.parent().get());
59 + if (values == null) {
60 + return Optional.empty();
61 + }
62 +
63 + return values.stream()
64 + .filter(x -> x.id().equals(id))
65 + .map(x -> (Resource) x)
66 + .findFirst();
67 + }
68 +
69 + boolean appendValues(DiscreteResourceId key, List<ContinuousResource> values) {
70 + Set<ContinuousResource> requested = new LinkedHashSet<>(values);
71 + Set<ContinuousResource> oldValues = childMap.putIfAbsent(key, requested);
72 + if (oldValues == null) {
73 + return true;
74 + }
75 +
76 + Set<ContinuousResource> addedValues = Sets.difference(requested, oldValues);
77 + // no new value, then no-op
78 + if (addedValues.isEmpty()) {
79 + // don't write to map because all values are already stored
80 + return true;
81 + }
82 +
83 + Set<ContinuousResourceId> addedIds = addedValues.stream()
84 + .map(ContinuousResource::id)
85 + .collect(Collectors.toSet());
86 + // if the value is not found but the same ID is found
87 + // (this happens only when being continuous resource)
88 + if (oldValues.stream().anyMatch(x -> addedIds.contains(x.id()))) {
89 + // no-op, but indicating failure (reject the request)
90 + return false;
91 + }
92 + Set<ContinuousResource> newValues = new LinkedHashSet<>(oldValues);
93 + newValues.addAll(addedValues);
94 + return childMap.replace(key, oldValues, newValues);
95 + }
96 +
97 + boolean removeValues(DiscreteResourceId key, List<ContinuousResource> values) {
98 + Set<ContinuousResource> oldValues = childMap.putIfAbsent(key, new LinkedHashSet<>());
99 + if (oldValues == null) {
100 + log.trace("No-Op removing values. key {} did not exist", key);
101 + return true;
102 + }
103 +
104 + if (values.stream().allMatch(x -> !oldValues.contains(x))) {
105 + // don't write map because none of the values are stored
106 + log.trace("No-Op removing values. key {} did not contain {}", key, values);
107 + return true;
108 + }
109 +
110 + LinkedHashSet<ContinuousResource> newValues = new LinkedHashSet<>(oldValues);
111 + newValues.removeAll(values);
112 + return childMap.replace(key, oldValues, newValues);
113 + }
114 +
115 + boolean isAllocated(ContinuousResourceId id) {
116 + ContinuousResourceAllocation allocations = consumers.get(id);
117 + return allocations != null && !allocations.allocations().isEmpty();
118 + }
119 +
120 + boolean allocate(ResourceConsumer consumer, ContinuousResource request) {
121 + // if the resource is not registered, then abort
122 + Optional<Resource> lookedUp = lookup(request.id());
123 + if (!lookedUp.isPresent()) {
124 + return false;
125 + }
126 + // Down cast: this must be safe as ContinuousResource is associated with ContinuousResourceId
127 + ContinuousResource original = (ContinuousResource) lookedUp.get();
128 + ContinuousResourceAllocation allocations = consumers.get(request.id());
129 + if (!hasEnoughResource(original, request, allocations)) {
130 + return false;
131 + }
132 +
133 + boolean success = appendValue(original, new ResourceAllocation(request, consumer));
134 + if (!success) {
135 + return false;
136 + }
137 +
138 + return true;
139 + }
140 +
141 + // Appends the specified ResourceAllocation to the existing values stored in the map
142 + // computational complexity: O(n) where n is the number of the elements in the associated allocation
143 + private boolean appendValue(ContinuousResource original, ResourceAllocation value) {
144 + ContinuousResourceAllocation oldValue = consumers.putIfAbsent(original.id(),
145 + new ContinuousResourceAllocation(original, ImmutableList.of(value)));
146 + if (oldValue == null) {
147 + return true;
148 + }
149 +
150 + if (oldValue.allocations().contains(value)) {
151 + // don't write to map because all values are already stored
152 + return true;
153 + }
154 +
155 + ContinuousResourceAllocation newValue = new ContinuousResourceAllocation(original,
156 + ImmutableList.<ResourceAllocation>builder()
157 + .addAll(oldValue.allocations())
158 + .add(value)
159 + .build());
160 + return consumers.replace(original.id(), oldValue, newValue);
161 + }
162 +
163 + boolean release(ContinuousResource resource, ResourceConsumer consumer) {
164 + ContinuousResourceAllocation oldAllocation = consumers.get(resource.id());
165 + ImmutableList<ResourceAllocation> newAllocations = oldAllocation.allocations().stream()
166 + .filter(x -> !(x.consumer().equals(consumer) &&
167 + ((ContinuousResource) x.resource()).value() == resource.value()))
168 + .collect(GuavaCollectors.toImmutableList());
169 +
170 + if (!consumers.replace(resource.id(), oldAllocation,
171 + new ContinuousResourceAllocation(oldAllocation.original(), newAllocations))) {
172 + return false;
173 + }
174 +
175 + return true;
176 + }
177 +}
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.resource.impl;
17 +
18 +import com.google.common.collect.Sets;
19 +import org.onosproject.net.resource.DiscreteResource;
20 +import org.onosproject.net.resource.DiscreteResourceId;
21 +import org.onosproject.net.resource.Resource;
22 +import org.onosproject.net.resource.ResourceConsumer;
23 +import org.onosproject.net.resource.Resources;
24 +import org.onosproject.store.service.TransactionContext;
25 +import org.onosproject.store.service.TransactionalMap;
26 +import org.slf4j.Logger;
27 +import org.slf4j.LoggerFactory;
28 +
29 +import java.util.LinkedHashSet;
30 +import java.util.List;
31 +import java.util.Optional;
32 +import java.util.Set;
33 +
34 +import static org.onosproject.store.resource.impl.ConsistentResourceStore.SERIALIZER;
35 +
36 +class TransactionalDiscreteResourceStore {
37 + private final Logger log = LoggerFactory.getLogger(getClass());
38 + private final TransactionalMap<DiscreteResourceId, Set<DiscreteResource>> childMap;
39 + private final TransactionalMap<DiscreteResourceId, ResourceConsumer> consumers;
40 +
41 + TransactionalDiscreteResourceStore(TransactionContext tx) {
42 + this.childMap = tx.getTransactionalMap(MapNames.DISCRETE_CHILD_MAP, SERIALIZER);
43 + this.consumers = tx.getTransactionalMap(MapNames.DISCRETE_CONSUMER_MAP, SERIALIZER);
44 + }
45 +
46 + // check the existence in the set: O(1) operation
47 + Optional<Resource> lookup(DiscreteResourceId id) {
48 + if (!id.parent().isPresent()) {
49 + return Optional.of(Resource.ROOT);
50 + }
51 +
52 + Set<DiscreteResource> values = childMap.get(id.parent().get());
53 + if (values == null) {
54 + return Optional.empty();
55 + }
56 +
57 + DiscreteResource resource = Resources.discrete(id).resource();
58 + if (values.contains(resource)) {
59 + return Optional.of(resource);
60 + } else {
61 + return Optional.empty();
62 + }
63 + }
64 +
65 + boolean appendValues(DiscreteResourceId key, List<DiscreteResource> values) {
66 + Set<DiscreteResource> requested = new LinkedHashSet<>(values);
67 + Set<DiscreteResource> oldValues = childMap.putIfAbsent(key, requested);
68 + if (oldValues == null) {
69 + return true;
70 + }
71 +
72 + Set<DiscreteResource> addedValues = Sets.difference(requested, oldValues);
73 + // no new value, then no-op
74 + if (addedValues.isEmpty()) {
75 + // don't write to map because all values are already stored
76 + return true;
77 + }
78 +
79 + Set<DiscreteResource> newValues = new LinkedHashSet<>(oldValues);
80 + newValues.addAll(addedValues);
81 + return childMap.replace(key, oldValues, newValues);
82 + }
83 +
84 + boolean removeValues(DiscreteResourceId key, List<DiscreteResource> values) {
85 + Set<DiscreteResource> oldValues = childMap.putIfAbsent(key, new LinkedHashSet<>());
86 + if (oldValues == null) {
87 + log.trace("No-Op removing values. key {} did not exist", key);
88 + return true;
89 + }
90 +
91 + if (values.stream().allMatch(x -> !oldValues.contains(x))) {
92 + // don't write map because none of the values are stored
93 + log.trace("No-Op removing values. key {} did not contain {}", key, values);
94 + return true;
95 + }
96 +
97 + LinkedHashSet<DiscreteResource> newValues = new LinkedHashSet<>(oldValues);
98 + newValues.removeAll(values);
99 + return childMap.replace(key, oldValues, newValues);
100 + }
101 +
102 + boolean isAllocated(DiscreteResourceId id) {
103 + return consumers.get(id) != null;
104 + }
105 +
106 + boolean allocate(ResourceConsumer consumer, DiscreteResource resource) {
107 + // if the resource is not registered, then abort
108 + Optional<Resource> lookedUp = lookup(resource.id());
109 + if (!lookedUp.isPresent()) {
110 + return false;
111 + }
112 +
113 + ResourceConsumer oldValue = consumers.put(resource.id(), consumer);
114 + return oldValue == null;
115 + }
116 +
117 + boolean release(DiscreteResource resource, ResourceConsumer consumer) {
118 + // if this single release fails (because the resource is allocated to another consumer)
119 + // the whole release fails
120 + if (!consumers.remove(resource.id(), consumer)) {
121 + return false;
122 + }
123 +
124 + return true;
125 + }
126 +}