Yuta HIGUCHI

DistributedLinkResourceStore

Change-Id: Ia45c221946693906c12d64f20f25e30786a04224
...@@ -17,6 +17,7 @@ package org.onlab.onos.net.resource; ...@@ -17,6 +17,7 @@ package org.onlab.onos.net.resource;
17 17
18 import java.util.Objects; 18 import java.util.Objects;
19 19
20 +// FIXME: Document what is the unit? Mbps?
20 /** 21 /**
21 * Representation of bandwidth resource. 22 * Representation of bandwidth resource.
22 */ 23 */
......
...@@ -49,6 +49,10 @@ public final class BatchWriteRequest { ...@@ -49,6 +49,10 @@ public final class BatchWriteRequest {
49 .toString(); 49 .toString();
50 } 50 }
51 51
52 + public static Builder newBuilder() {
53 + return new Builder();
54 + }
55 +
52 /** 56 /**
53 * Builder for BatchWriteRequest. 57 * Builder for BatchWriteRequest.
54 */ 58 */
......
...@@ -64,13 +64,13 @@ public class WriteRequest { ...@@ -64,13 +64,13 @@ public class WriteRequest {
64 * 64 *
65 * @param tableName name of the table 65 * @param tableName name of the table
66 * @param key key in the table 66 * @param key key in the table
67 - * @param newValue value to write, must not be null
68 * @param oldValue previous value expected, must not be null 67 * @param oldValue previous value expected, must not be null
68 + * @param newValue value to write, must not be null
69 * @return WriteRequest 69 * @return WriteRequest
70 */ 70 */
71 public static WriteRequest putIfValueMatches(String tableName, String key, 71 public static WriteRequest putIfValueMatches(String tableName, String key,
72 - byte[] newValue, 72 + byte[] oldValue,
73 - byte[] oldValue) { 73 + byte[] newValue) {
74 return new WriteRequest(PUT_IF_VALUE, tableName, key, 74 return new WriteRequest(PUT_IF_VALUE, tableName, key,
75 checkNotNull(newValue), ANY_VERSION, 75 checkNotNull(newValue), ANY_VERSION,
76 checkNotNull(oldValue)); 76 checkNotNull(oldValue));
......
...@@ -18,9 +18,13 @@ package org.onlab.onos.store.resource.impl; ...@@ -18,9 +18,13 @@ package org.onlab.onos.store.resource.impl;
18 import org.apache.felix.scr.annotations.Activate; 18 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component; 19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 20 import org.apache.felix.scr.annotations.Deactivate;
21 +import org.apache.felix.scr.annotations.Reference;
22 +import org.apache.felix.scr.annotations.ReferenceCardinality;
21 import org.apache.felix.scr.annotations.Service; 23 import org.apache.felix.scr.annotations.Service;
22 import org.onlab.onos.net.Link; 24 import org.onlab.onos.net.Link;
25 +import org.onlab.onos.net.LinkKey;
23 import org.onlab.onos.net.intent.IntentId; 26 import org.onlab.onos.net.intent.IntentId;
27 +import org.onlab.onos.net.link.LinkService;
24 import org.onlab.onos.net.resource.Bandwidth; 28 import org.onlab.onos.net.resource.Bandwidth;
25 import org.onlab.onos.net.resource.BandwidthResourceAllocation; 29 import org.onlab.onos.net.resource.BandwidthResourceAllocation;
26 import org.onlab.onos.net.resource.Lambda; 30 import org.onlab.onos.net.resource.Lambda;
...@@ -29,34 +33,88 @@ import org.onlab.onos.net.resource.LinkResourceAllocations; ...@@ -29,34 +33,88 @@ import org.onlab.onos.net.resource.LinkResourceAllocations;
29 import org.onlab.onos.net.resource.LinkResourceStore; 33 import org.onlab.onos.net.resource.LinkResourceStore;
30 import org.onlab.onos.net.resource.ResourceAllocation; 34 import org.onlab.onos.net.resource.ResourceAllocation;
31 import org.onlab.onos.net.resource.ResourceType; 35 import org.onlab.onos.net.resource.ResourceType;
36 +import org.onlab.onos.store.serializers.KryoSerializer;
37 +import org.onlab.onos.store.serializers.StoreSerializer;
38 +import org.onlab.onos.store.service.BatchWriteRequest;
39 +import org.onlab.onos.store.service.BatchWriteRequest.Builder;
40 +import org.onlab.onos.store.service.BatchWriteResult;
41 +import org.onlab.onos.store.service.DatabaseAdminService;
42 +import org.onlab.onos.store.service.DatabaseService;
43 +import org.onlab.onos.store.service.VersionedValue;
44 +import org.onlab.onos.store.service.WriteRequest;
45 +import org.onlab.onos.store.service.WriteResult;
32 import org.slf4j.Logger; 46 import org.slf4j.Logger;
33 47
34 -import java.util.Collections; 48 +import com.google.common.base.Function;
49 +import com.google.common.collect.FluentIterable;
50 +import com.google.common.collect.ImmutableSet;
51 +import com.google.common.collect.Sets;
52 +
53 +import java.util.ArrayList;
54 +import java.util.Collection;
35 import java.util.HashMap; 55 import java.util.HashMap;
36 import java.util.HashSet; 56 import java.util.HashSet;
57 +import java.util.List;
37 import java.util.Map; 58 import java.util.Map;
38 import java.util.Set; 59 import java.util.Set;
39 60
61 +import static com.google.common.base.Preconditions.checkArgument;
40 import static com.google.common.base.Preconditions.checkNotNull; 62 import static com.google.common.base.Preconditions.checkNotNull;
41 import static com.google.common.base.Preconditions.checkState; 63 import static com.google.common.base.Preconditions.checkState;
64 +import static com.google.common.base.Predicates.notNull;
65 +import static org.onlab.util.HexString.toHexString;
42 import static org.slf4j.LoggerFactory.getLogger; 66 import static org.slf4j.LoggerFactory.getLogger;
43 67
44 /** 68 /**
45 - * Manages link resources using trivial in-memory structures implementation. 69 + * Manages link resources using database service.
46 */ 70 */
47 @Component(immediate = true) 71 @Component(immediate = true)
48 @Service 72 @Service
49 public class DistributedLinkResourceStore implements LinkResourceStore { 73 public class DistributedLinkResourceStore implements LinkResourceStore {
74 +
50 private final Logger log = getLogger(getClass()); 75 private final Logger log = getLogger(getClass());
51 - private Map<IntentId, LinkResourceAllocations> linkResourceAllocationsMap; 76 +
52 - private Map<Link, Set<LinkResourceAllocations>> allocatedResources; 77 + // FIXME: what is the Bandwidth unit?
53 - private Map<Link, Set<ResourceAllocation>> freeResources; 78 + private static final Bandwidth DEFAULT_BANDWIDTH = Bandwidth.valueOf(1_000);
79 +
80 + // table to store current allocations
81 + /** LinkKey -> List<LinkResourceAllocations>. */
82 + private static final String LINK_RESOURCE_ALLOCATIONS = "LinkResourceAllocations";
83 +
84 + /** IntentId -> LinkResourceAllocations. */
85 + private static final String INTENT_ALLOCATIONS = "IntentAllocations";
86 +
87 + private static final Bandwidth EMPTY_BW = Bandwidth.valueOf(0);
88 +
89 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90 + protected DatabaseAdminService databaseAdminService;
91 +
92 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 + protected DatabaseService databaseService;
94 +
95 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 + protected LinkService linkService;
97 +
98 + // Link annotation key name to use as bandwidth
99 + private String bandwidthAnnotation = "bandwidth";
100 + // Link annotation key name to use as max lamda
101 + private String wavesAnnotation = "optical.waves";
102 +
103 + private StoreSerializer serializer;
104 +
54 105
55 @Activate 106 @Activate
56 public void activate() { 107 public void activate() {
57 - linkResourceAllocationsMap = new HashMap<>(); 108 +
58 - allocatedResources = new HashMap<>(); 109 + serializer = new KryoSerializer();
59 - freeResources = new HashMap<>(); 110 +
111 + Set<String> tables = databaseAdminService.listTables();
112 + if (!tables.contains(LINK_RESOURCE_ALLOCATIONS)) {
113 + databaseAdminService.createTable(LINK_RESOURCE_ALLOCATIONS);
114 + }
115 + if (!tables.contains(INTENT_ALLOCATIONS)) {
116 + databaseAdminService.createTable(INTENT_ALLOCATIONS);
117 + }
60 118
61 log.info("Started"); 119 log.info("Started");
62 } 120 }
...@@ -66,167 +124,431 @@ public class DistributedLinkResourceStore implements LinkResourceStore { ...@@ -66,167 +124,431 @@ public class DistributedLinkResourceStore implements LinkResourceStore {
66 log.info("Stopped"); 124 log.info("Stopped");
67 } 125 }
68 126
69 - /** 127 + private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) {
70 - * Returns free resources for a given link obtaining from topology 128 + // TODO: plugin/provider mechanism to add resource type in the future?
71 - * information. 129 + if (type == ResourceType.BANDWIDTH) {
72 - * 130 + return ImmutableSet.of(getBandwidthResourceCapacity(link));
73 - * @param link the target link 131 + }
74 - * @return free resources 132 + if (type == ResourceType.LAMBDA) {
75 - */ 133 + return getLambdaResourceCapacity(link);
76 - private synchronized Set<ResourceAllocation> readOriginalFreeResources(Link link) { 134 + }
77 - // TODO read capacity and lambda resources from topology 135 + return null;
78 - Set<ResourceAllocation> allocations = new HashSet<>(); 136 + }
79 - for (int i = 1; i <= 100; i++) { 137 +
80 - allocations.add(new LambdaResourceAllocation(Lambda.valueOf(i))); 138 + private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) {
81 - } 139 + // FIXME enumerate all the possible link/port lambdas
82 - allocations.add(new BandwidthResourceAllocation(Bandwidth.valueOf(1000000))); 140 + Set<LambdaResourceAllocation> allocations = new HashSet<>();
141 + try {
142 + final int waves = Integer.parseInt(link.annotations().value(wavesAnnotation));
143 + for (int i = 1; i <= waves; i++) {
144 + allocations.add(new LambdaResourceAllocation(Lambda.valueOf(i)));
145 + }
146 + } catch (NumberFormatException e) {
147 + log.debug("No {} annotation on link %s", wavesAnnotation, link);
148 + }
83 return allocations; 149 return allocations;
84 } 150 }
85 151
86 - /** 152 + private BandwidthResourceAllocation getBandwidthResourceCapacity(Link link) {
87 - * Finds and returns {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object from a given 153 +
88 - * set. 154 + // if Link annotation exist, use them
89 - * 155 + // if all fails, use DEFAULT_BANDWIDTH
90 - * @param freeRes a set of ResourceAllocation object. 156 +
91 - * @return {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object if found, otherwise 157 + Bandwidth bandwidth = null;
92 - * {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object with 0 bandwidth 158 + String strBw = link.annotations().value(bandwidthAnnotation);
93 - * 159 + if (strBw != null) {
94 - */ 160 + try {
95 - private synchronized BandwidthResourceAllocation getBandwidth(Set<ResourceAllocation> freeRes) { 161 + bandwidth = Bandwidth.valueOf(Double.parseDouble(strBw));
96 - for (ResourceAllocation res : freeRes) { 162 + } catch (NumberFormatException e) {
97 - if (res.type() == ResourceType.BANDWIDTH) { 163 + // do nothings
98 - return (BandwidthResourceAllocation) res; 164 + bandwidth = null;
99 } 165 }
100 } 166 }
101 - return new BandwidthResourceAllocation(Bandwidth.valueOf(0)); 167 +
168 + if (bandwidth == null) {
169 + // fall back, use fixed default
170 + bandwidth = DEFAULT_BANDWIDTH;
171 + }
172 + return new BandwidthResourceAllocation(bandwidth);
102 } 173 }
103 174
104 - /** 175 + private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) {
105 - * Subtracts given resources from free resources for given link. 176 + Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>();
106 - * 177 + for (ResourceType type : ResourceType.values()) {
107 - * @param link the target link 178 + Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link);
108 - * @param allocations the resources to be subtracted 179 + if (cap != null) {
109 - */ 180 + caps.put(type, cap);
110 - private synchronized void subtractFreeResources(Link link, LinkResourceAllocations allocations) {
111 - // TODO Use lock or version for updating freeResources.
112 - checkNotNull(link);
113 - Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
114 - Set<ResourceAllocation> subRes = allocations.getResourceAllocation(link);
115 - for (ResourceAllocation res : subRes) {
116 - switch (res.type()) {
117 - case BANDWIDTH:
118 - BandwidthResourceAllocation ba = getBandwidth(freeRes);
119 - double requestedBandwidth =
120 - ((BandwidthResourceAllocation) res).bandwidth().toDouble();
121 - double newBandwidth = ba.bandwidth().toDouble() - requestedBandwidth;
122 - checkState(newBandwidth >= 0.0);
123 - freeRes.remove(ba);
124 - freeRes.add(new BandwidthResourceAllocation(
125 - Bandwidth.valueOf(newBandwidth)));
126 - break;
127 - case LAMBDA:
128 - checkState(freeRes.remove(res));
129 - break;
130 - default:
131 - break;
132 } 181 }
133 } 182 }
134 - freeResources.put(link, freeRes); 183 + return caps;
184 + }
135 185
186 + @Override
187 + public Set<ResourceAllocation> getFreeResources(Link link) {
188 + Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(link);
189 + Set<ResourceAllocation> allFree = new HashSet<>();
190 + for (Set<? extends ResourceAllocation> r:freeResources.values()) {
191 + allFree.addAll(r);
192 + }
193 + return allFree;
136 } 194 }
137 195
138 - /** 196 + private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(Link link) {
139 - * Adds given resources to free resources for given link. 197 + // returns capacity - allocated
140 - * 198 +
141 - * @param link the target link 199 + checkNotNull(link);
142 - * @param allocations the resources to be added 200 + Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>();
143 - */ 201 + final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link);
144 - private synchronized void addFreeResources(Link link, LinkResourceAllocations allocations) { 202 + final Iterable<LinkResourceAllocations> allocations = getAllocations(link);
145 - // TODO Use lock or version for updating freeResources. 203 +
146 - Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link)); 204 + for (ResourceType type : ResourceType.values()) {
147 - Set<ResourceAllocation> addRes = allocations.getResourceAllocation(link); 205 + // there should be class/category of resources
148 - for (ResourceAllocation res : addRes) { 206 + switch (type) {
149 - switch (res.type()) {
150 case BANDWIDTH: 207 case BANDWIDTH:
151 - BandwidthResourceAllocation ba = getBandwidth(freeRes); 208 + {
152 - double requestedBandwidth = 209 + Set<? extends ResourceAllocation> bw = caps.get(ResourceType.BANDWIDTH);
153 - ((BandwidthResourceAllocation) res).bandwidth().toDouble(); 210 + if (bw == null || bw.isEmpty()) {
154 - double newBandwidth = ba.bandwidth().toDouble() + requestedBandwidth; 211 + bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW));
155 - freeRes.remove(ba); 212 + }
156 - freeRes.add(new BandwidthResourceAllocation( 213 +
157 - Bandwidth.valueOf(newBandwidth))); 214 + BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next();
215 + double freeBw = cap.bandwidth().toDouble();
216 +
217 + // enumerate current allocations, subtracting resources
218 + for (LinkResourceAllocations alloc : allocations) {
219 + Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
220 + for (ResourceAllocation a : types) {
221 + if (a instanceof BandwidthResourceAllocation) {
222 + BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a;
223 + freeBw -= bwA.bandwidth().toDouble();
224 + }
225 + }
226 + }
227 +
228 + free.put(type, Sets.newHashSet(new BandwidthResourceAllocation(Bandwidth.valueOf(freeBw))));
158 break; 229 break;
230 + }
231 +
159 case LAMBDA: 232 case LAMBDA:
160 - checkState(freeRes.add(res)); 233 + {
234 + Set<? extends ResourceAllocation> lmd = caps.get(type);
235 + if (lmd == null || lmd.isEmpty()) {
236 + // nothing left
237 + break;
238 + }
239 + Set<LambdaResourceAllocation> freeL = new HashSet<>();
240 + for (ResourceAllocation r : lmd) {
241 + if (r instanceof LambdaResourceAllocation) {
242 + freeL.add((LambdaResourceAllocation) r);
243 + }
244 + }
245 +
246 + // enumerate current allocations, removing resources
247 + for (LinkResourceAllocations alloc : allocations) {
248 + Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
249 + for (ResourceAllocation a : types) {
250 + if (a instanceof LambdaResourceAllocation) {
251 + freeL.remove(a);
252 + }
253 + }
254 + }
255 +
256 + free.put(type, freeL);
161 break; 257 break;
258 + }
259 +
162 default: 260 default:
163 break; 261 break;
164 } 262 }
165 } 263 }
166 - freeResources.put(link, freeRes); 264 + return free;
167 } 265 }
168 266
169 - @Override 267 + private LinkResourceAllocations getIntentAllocations(IntentId id) {
170 - public synchronized Set<ResourceAllocation> getFreeResources(Link link) { 268 + VersionedValue vv
171 - checkNotNull(link); 269 + = databaseService.get(INTENT_ALLOCATIONS, toIntentDbKey(checkNotNull(id)));
172 - Set<ResourceAllocation> freeRes = freeResources.get(link); 270 + if (vv == null || vv.value() == null) {
173 - if (freeRes == null) { 271 + return null;
174 - freeRes = readOriginalFreeResources(link);
175 } 272 }
273 + return decodeIntentAllocations(vv.value());
274 + }
176 275
177 - return freeRes; 276 + private Builder putIntentAllocations(Builder ctx,
277 + IntentId id,
278 + LinkResourceAllocations alloc) {
279 + return ctx.put(INTENT_ALLOCATIONS,
280 + toIntentDbKey(id),
281 + encodeIntentAllocations(alloc));
178 } 282 }
179 283
284 +
180 @Override 285 @Override
181 - public synchronized void allocateResources(LinkResourceAllocations allocations) { 286 + public void allocateResources(LinkResourceAllocations allocations) {
182 checkNotNull(allocations); 287 checkNotNull(allocations);
183 - linkResourceAllocationsMap.put(allocations.intendId(), allocations); 288 +
289 + Builder tx = BatchWriteRequest.newBuilder();
290 +
291 + // TODO: Should IntentId -> Allocation be updated conditionally?
292 + putIntentAllocations(tx, allocations.intendId(), allocations);
293 +
184 for (Link link : allocations.links()) { 294 for (Link link : allocations.links()) {
185 - subtractFreeResources(link, allocations); 295 + allocateLinkResource(tx, link, allocations);
186 - Set<LinkResourceAllocations> linkAllocs = allocatedResources.get(link); 296 + }
187 - if (linkAllocs == null) { 297 +
188 - linkAllocs = new HashSet<>(); 298 + BatchWriteRequest batch = tx.build();
299 +// log.info("Intent: {}", databaseService.getAll(INTENT_ALLOCATIONS));
300 +// log.info("Link: {}", databaseService.getAll(LINK_RESOURCE_ALLOCATIONS));
301 +
302 + BatchWriteResult result = databaseService.batchWrite(batch);
303 + if (!result.isSuccessful()) {
304 + log.error("Allocation Failed.");
305 + if (log.isDebugEnabled()) {
306 + logFailureDetail(batch, result);
307 + }
308 + // FIXME throw appropriate exception, with what failed.
309 + checkState(result.isSuccessful(), "Allocation failed");
310 + }
311 + }
312 +
313 + private void logFailureDetail(BatchWriteRequest batch,
314 + BatchWriteResult result) {
315 + for (int i = 0; i < batch.batchSize(); ++i) {
316 + final WriteRequest req = batch.getAsList().get(i);
317 + final WriteResult fail = result.getAsList().get(i);
318 + switch (fail.status()) {
319 + case ABORTED:
320 + log.debug("ABORTED: {}@{}", req.key(), req.tableName());
321 + break;
322 + case PRECONDITION_VIOLATION:
323 + switch (req.type()) {
324 + case PUT_IF_ABSENT:
325 + log.debug("{}: {}@{} : {}", req.type(),
326 + req.key(), req.tableName(), fail.previousValue());
327 + break;
328 + case PUT_IF_VALUE:
329 + case REMOVE_IF_VALUE:
330 + log.debug("{}: {}@{} : was {}, expected {}", req.type(),
331 + req.key(), req.tableName(),
332 + fail.previousValue(),
333 + toHexString(req.oldValue()));
334 + break;
335 + case PUT_IF_VERSION:
336 + case REMOVE_IF_VERSION:
337 + log.debug("{}: {}@{} : was {}, expected {}", req.type(),
338 + req.key(), req.tableName(),
339 + fail.previousValue().version(),
340 + req.previousVersion());
341 + break;
342 + default:
343 + log.error("Should never reach here.");
344 + break;
345 + }
346 + break;
347 + default:
348 + log.error("Should never reach here.");
349 + break;
189 } 350 }
190 - linkAllocs.add(allocations);
191 - allocatedResources.put(link, linkAllocs);
192 } 351 }
193 } 352 }
194 353
354 + private Builder allocateLinkResource(Builder builder, Link link,
355 + LinkResourceAllocations allocations) {
356 +
357 + // requested resources
358 + Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link);
359 +
360 + Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(link);
361 + for (ResourceAllocation req : reqs) {
362 + Set<? extends ResourceAllocation> avail = available.get(req.type());
363 + if (req instanceof BandwidthResourceAllocation) {
364 + // check if allocation should be accepted
365 + if (avail.isEmpty()) {
366 + checkState(!avail.isEmpty(),
367 + "There's no Bandwidth resource on %s?",
368 + link);
369 + }
370 + BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next();
371 + double bwLeft = bw.bandwidth().toDouble();
372 + bwLeft -= ((BandwidthResourceAllocation) req).bandwidth().toDouble();
373 + if (bwLeft < 0) {
374 + // FIXME throw appropriate Exception
375 + checkState(bwLeft >= 0,
376 + "There's no Bandwidth left on %s. %s",
377 + link, bwLeft);
378 + }
379 + } else if (req instanceof LambdaResourceAllocation) {
380 +
381 + // check if allocation should be accepted
382 + if (!avail.contains(req)) {
383 + // requested lambda was not available
384 + // FIXME throw appropriate exception
385 + checkState(avail.contains(req),
386 + "Allocating %s on %s failed",
387 + req, link);
388 + }
389 + }
390 + }
391 + // all requests allocatable => add allocation
392 + final List<LinkResourceAllocations> before = getAllocations(link);
393 + List<LinkResourceAllocations> after = new ArrayList<>(before.size());
394 + after.addAll(before);
395 + after.add(allocations);
396 + replaceLinkAllocations(builder, LinkKey.linkKey(link), before, after);
397 + return builder;
398 + }
399 +
400 + private Builder replaceLinkAllocations(Builder builder, LinkKey linkKey,
401 + List<LinkResourceAllocations> before,
402 + List<LinkResourceAllocations> after) {
403 +
404 + byte[] oldValue = encodeLinkAllocations(before);
405 + byte[] newValue = encodeLinkAllocations(after);
406 + builder.putIfValueMatches(LINK_RESOURCE_ALLOCATIONS, toLinkDbKey(linkKey), oldValue, newValue);
407 + return builder;
408 + }
409 +
195 @Override 410 @Override
196 - public synchronized void releaseResources(LinkResourceAllocations allocations) { 411 + public void releaseResources(LinkResourceAllocations allocations) {
197 checkNotNull(allocations); 412 checkNotNull(allocations);
198 - linkResourceAllocationsMap.remove(allocations.intendId()); 413 +
199 - for (Link link : allocations.links()) { 414 + final IntentId intendId = allocations.intendId();
200 - addFreeResources(link, allocations); 415 + final String dbIntentId = toIntentDbKey(intendId);
201 - Set<LinkResourceAllocations> linkAllocs = allocatedResources.get(link); 416 + final Collection<Link> links = allocations.links();
202 - if (linkAllocs == null) { 417 +
203 - log.error("Missing resource allocation."); 418 + // TODO: does release must happen in a batch?
204 - } else { 419 + boolean success;
205 - linkAllocs.remove(allocations); 420 + do {
421 + Builder tx = BatchWriteRequest.newBuilder();
422 +
423 + // TODO: Should IntentId -> Allocation be updated conditionally?
424 + tx.remove(INTENT_ALLOCATIONS, dbIntentId);
425 +
426 + for (Link link : links) {
427 + final LinkKey linkId = LinkKey.linkKey(link);
428 + final String dbLinkId = toLinkDbKey(linkId);
429 + VersionedValue vv = databaseService.get(LINK_RESOURCE_ALLOCATIONS, dbLinkId);
430 + if (vv == null || vv.value() == null) {
431 + // something is wrong, but it is already freed
432 + log.warn("There was no resource left to release on {}", linkId);
433 + continue;
434 + }
435 + List<LinkResourceAllocations> before = decodeLinkAllocations(vv.value());
436 + List<LinkResourceAllocations> after = new ArrayList<>(before);
437 + after.remove(allocations);
438 + byte[] oldValue = encodeLinkAllocations(before);
439 + byte[] newValue = encodeLinkAllocations(after);
440 + tx.putIfValueMatches(LINK_RESOURCE_ALLOCATIONS, dbLinkId, oldValue, newValue);
206 } 441 }
207 - allocatedResources.put(link, linkAllocs); 442 +
208 - } 443 + BatchWriteResult batchWrite = databaseService.batchWrite(tx.build());
444 + success = batchWrite.isSuccessful();
445 + } while (!success);
209 } 446 }
210 447
211 @Override 448 @Override
212 - public synchronized LinkResourceAllocations getAllocations(IntentId intentId) { 449 + public LinkResourceAllocations getAllocations(IntentId intentId) {
213 checkNotNull(intentId); 450 checkNotNull(intentId);
214 - return linkResourceAllocationsMap.get(intentId); 451 + VersionedValue vv = databaseService.get(INTENT_ALLOCATIONS, toIntentDbKey(intentId));
452 + if (vv == null) {
453 + // FIXME: should we return null or LinkResourceAllocations with nothing allocated?
454 + return null;
455 + }
456 + LinkResourceAllocations allocations = decodeIntentAllocations(vv.value());
457 + return allocations;
458 + }
459 +
460 + private String toLinkDbKey(LinkKey linkid) {
461 + // introduce cache if necessary
462 + return linkid.toString();
463 + // TODO: Above is irreversible, if we need reverse conversion
464 + // we may need something like below, due to String only limitation
465 +// byte[] bytes = serializer.encode(linkid);
466 +// StringBuilder builder = new StringBuilder(bytes.length * 4);
467 +// boolean isFirst = true;
468 +// for (byte b : bytes) {
469 +// if (!isFirst) {
470 +// builder.append(',');
471 +// }
472 +// builder.append(b);
473 +// isFirst = false;
474 +// }
475 +// return builder.toString();
476 + }
477 +
478 +// private LinkKey toLinkKey(String linkKey) {
479 +// String[] bytes = linkKey.split(",");
480 +// ByteBuffer buf = ByteBuffer.allocate(bytes.length);
481 +// for (String bs : bytes) {
482 +// buf.put(Byte.parseByte(bs));
483 +// }
484 +// buf.flip();
485 +// return serializer.decode(buf);
486 +// }
487 +
488 + private String toIntentDbKey(IntentId intentid) {
489 + return intentid.toString();
490 + }
491 +
492 + private IntentId toIntentId(String intentid) {
493 + checkArgument(intentid.startsWith("0x"));
494 + return IntentId.valueOf(Long.parseLong(intentid.substring(2)));
495 + }
496 +
497 + private LinkResourceAllocations decodeIntentAllocations(byte[] bytes) {
498 + return serializer.decode(bytes);
499 + }
500 +
501 + private byte[] encodeIntentAllocations(LinkResourceAllocations alloc) {
502 + return serializer.encode(checkNotNull(alloc));
503 + }
504 +
505 + private List<LinkResourceAllocations> decodeLinkAllocations(byte[] bytes) {
506 + return serializer.decode(bytes);
507 + }
508 +
509 + private byte[] encodeLinkAllocations(List<LinkResourceAllocations> alloc) {
510 + return serializer.encode(checkNotNull(alloc));
215 } 511 }
216 512
217 @Override 513 @Override
218 - public synchronized Iterable<LinkResourceAllocations> getAllocations(Link link) { 514 + public List<LinkResourceAllocations> getAllocations(Link link) {
219 checkNotNull(link); 515 checkNotNull(link);
220 - Set<LinkResourceAllocations> result = allocatedResources.get(link); 516 + final LinkKey key = LinkKey.linkKey(link);
221 - if (result == null) { 517 + final String dbKey = toLinkDbKey(key);
222 - result = Collections.emptySet(); 518 + VersionedValue vv = databaseService.get(LINK_RESOURCE_ALLOCATIONS, dbKey);
519 + if (vv == null) {
520 + // write empty so that all other update can be replace operation
521 + byte[] emptyList = encodeLinkAllocations(new ArrayList<>());
522 + boolean written = databaseService.putIfAbsent(LINK_RESOURCE_ALLOCATIONS, dbKey, emptyList);
523 + log.trace("Empty allocation write success? {}", written);
524 + vv = databaseService.get(LINK_RESOURCE_ALLOCATIONS, dbKey);
525 + if (vv == null) {
526 + log.error("Failed to re-read allocation for {}", dbKey);
527 + // note: cannot be Collections.emptyList();
528 + return new ArrayList<>();
529 + }
223 } 530 }
224 - return Collections.unmodifiableSet(result); 531 + List<LinkResourceAllocations> allocations = decodeLinkAllocations(vv.value());
532 + return allocations;
225 } 533 }
226 534
227 @Override 535 @Override
228 - public synchronized Iterable<LinkResourceAllocations> getAllocations() { 536 + public Iterable<LinkResourceAllocations> getAllocations() {
229 - return Collections.unmodifiableCollection(linkResourceAllocationsMap.values()); 537 + //IntentId -> LinkResourceAllocations
538 + Map<String, VersionedValue> all = databaseService.getAll(INTENT_ALLOCATIONS);
539 +
540 + return FluentIterable.from(all.values())
541 + .transform(new Function<VersionedValue, LinkResourceAllocations>() {
542 +
543 + @Override
544 + public LinkResourceAllocations apply(VersionedValue input) {
545 + if (input == null || input.value() == null) {
546 + return null;
547 + }
548 + return decodeIntentAllocations(input.value());
549 + }
550 + })
551 + .filter(notNull());
230 } 552 }
231 553
232 } 554 }
......