Committed by
Yuta HIGUCHI
ONOS-3472 Fixing ConsistentMap key equality
- ConsistentMap's key equality is based on serialized byte[]. 2 Problems fixed by this patch: (1) By caching Key -> String representation, Cache will use Key's Object#equals for look up, which can possibly have different equality than byte[] equality, leading to wrong String to be used as a key in backend Database. Fixed by reversing the mapping. (2) Similar issues with keySet(), entrySet() Set based on reference equality needs to be used to avoid deduplication based on Object#equals Fixed by replacing Set implementation with MappingSet. Change-Id: I1b727abd2614a9b72b5b1d02ecca2de26493adcc
Showing
3 changed files
with
164 additions
and
16 deletions
... | @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; | ... | @@ -20,6 +20,7 @@ import com.google.common.cache.CacheBuilder; |
20 | import com.google.common.cache.CacheLoader; | 20 | import com.google.common.cache.CacheLoader; |
21 | import com.google.common.cache.LoadingCache; | 21 | import com.google.common.cache.LoadingCache; |
22 | import com.google.common.collect.Maps; | 22 | import com.google.common.collect.Maps; |
23 | + | ||
23 | import org.onlab.util.HexString; | 24 | import org.onlab.util.HexString; |
24 | import org.onlab.util.SharedExecutors; | 25 | import org.onlab.util.SharedExecutors; |
25 | import org.onlab.util.Tools; | 26 | import org.onlab.util.Tools; |
... | @@ -33,6 +34,7 @@ import org.onosproject.store.service.Versioned; | ... | @@ -33,6 +34,7 @@ import org.onosproject.store.service.Versioned; |
33 | import org.slf4j.Logger; | 34 | import org.slf4j.Logger; |
34 | 35 | ||
35 | import java.util.Collection; | 36 | import java.util.Collection; |
37 | +import java.util.Collections; | ||
36 | import java.util.Map; | 38 | import java.util.Map; |
37 | import java.util.Map.Entry; | 39 | import java.util.Map.Entry; |
38 | import java.util.Objects; | 40 | import java.util.Objects; |
... | @@ -92,18 +94,25 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V | ... | @@ -92,18 +94,25 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V |
92 | private static final String ERROR_NULL_KEY = "Key cannot be null"; | 94 | private static final String ERROR_NULL_KEY = "Key cannot be null"; |
93 | private static final String ERROR_NULL_VALUE = "Null values are not allowed"; | 95 | private static final String ERROR_NULL_VALUE = "Null values are not allowed"; |
94 | 96 | ||
95 | - private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder() | 97 | + // String representation of serialized byte[] -> original key Object |
98 | + private final LoadingCache<String, K> keyCache = CacheBuilder.newBuilder() | ||
96 | .softValues() | 99 | .softValues() |
97 | - .build(new CacheLoader<K, String>() { | 100 | + .build(new CacheLoader<String, K>() { |
98 | 101 | ||
99 | @Override | 102 | @Override |
100 | - public String load(K key) { | 103 | + public K load(String key) { |
101 | - return HexString.toHexString(serializer.encode(key)); | 104 | + return serializer.decode(HexString.fromHexString(key)); |
102 | } | 105 | } |
103 | }); | 106 | }); |
104 | 107 | ||
108 | + protected String sK(K key) { | ||
109 | + String s = HexString.toHexString(serializer.encode(key)); | ||
110 | + keyCache.put(s, key); | ||
111 | + return s; | ||
112 | + } | ||
113 | + | ||
105 | protected K dK(String key) { | 114 | protected K dK(String key) { |
106 | - return serializer.decode(HexString.fromHexString(key)); | 115 | + return keyCache.getUnchecked(key); |
107 | } | 116 | } |
108 | 117 | ||
109 | public DefaultAsyncConsistentMap(String name, | 118 | public DefaultAsyncConsistentMap(String name, |
... | @@ -207,7 +216,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V | ... | @@ -207,7 +216,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V |
207 | public CompletableFuture<Boolean> containsKey(K key) { | 216 | public CompletableFuture<Boolean> containsKey(K key) { |
208 | checkNotNull(key, ERROR_NULL_KEY); | 217 | checkNotNull(key, ERROR_NULL_KEY); |
209 | final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY); | 218 | final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY); |
210 | - return database.mapContainsKey(name, keyCache.getUnchecked(key)) | 219 | + return database.mapContainsKey(name, sK(key)) |
211 | .whenComplete((r, e) -> timer.stop(e)); | 220 | .whenComplete((r, e) -> timer.stop(e)); |
212 | } | 221 | } |
213 | 222 | ||
... | @@ -223,7 +232,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V | ... | @@ -223,7 +232,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V |
223 | public CompletableFuture<Versioned<V>> get(K key) { | 232 | public CompletableFuture<Versioned<V>> get(K key) { |
224 | checkNotNull(key, ERROR_NULL_KEY); | 233 | checkNotNull(key, ERROR_NULL_KEY); |
225 | final MeteringAgent.Context timer = monitor.startTimer(GET); | 234 | final MeteringAgent.Context timer = monitor.startTimer(GET); |
226 | - return database.mapGet(name, keyCache.getUnchecked(key)) | 235 | + return database.mapGet(name, sK(key)) |
227 | .whenComplete((r, e) -> timer.stop(e)) | 236 | .whenComplete((r, e) -> timer.stop(e)) |
228 | .thenApply(v -> v != null ? v.map(serializer::decode) : null); | 237 | .thenApply(v -> v != null ? v.map(serializer::decode) : null); |
229 | } | 238 | } |
... | @@ -328,10 +337,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V | ... | @@ -328,10 +337,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V |
328 | public CompletableFuture<Set<K>> keySet() { | 337 | public CompletableFuture<Set<K>> keySet() { |
329 | final MeteringAgent.Context timer = monitor.startTimer(KEY_SET); | 338 | final MeteringAgent.Context timer = monitor.startTimer(KEY_SET); |
330 | return database.mapKeySet(name) | 339 | return database.mapKeySet(name) |
331 | - .thenApply(s -> s | 340 | + .thenApply(s -> newMappingKeySet(s)) |
332 | - .stream() | ||
333 | - .map(this::dK) | ||
334 | - .collect(Collectors.toSet())) | ||
335 | .whenComplete((r, e) -> timer.stop(e)); | 341 | .whenComplete((r, e) -> timer.stop(e)); |
336 | } | 342 | } |
337 | 343 | ||
... | @@ -351,10 +357,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V | ... | @@ -351,10 +357,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V |
351 | final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET); | 357 | final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET); |
352 | return database.mapEntrySet(name) | 358 | return database.mapEntrySet(name) |
353 | .whenComplete((r, e) -> timer.stop(e)) | 359 | .whenComplete((r, e) -> timer.stop(e)) |
354 | - .thenApply(s -> s | 360 | + .thenApply(s -> newMappingEntrySet(s)); |
355 | - .stream() | ||
356 | - .map(this::mapRawEntry) | ||
357 | - .collect(Collectors.toSet())); | ||
358 | } | 361 | } |
359 | 362 | ||
360 | @Override | 363 | @Override |
... | @@ -413,17 +416,31 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V | ... | @@ -413,17 +416,31 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V |
413 | checkIfUnmodifiable(); | 416 | checkIfUnmodifiable(); |
414 | } | 417 | } |
415 | 418 | ||
419 | + private Set<K> newMappingKeySet(Set<String> s) { | ||
420 | + return new MappingSet<>(s, Collections::unmodifiableSet, | ||
421 | + this::sK, this::dK); | ||
422 | + } | ||
423 | + | ||
424 | + private Set<Entry<K, Versioned<V>>> newMappingEntrySet(Set<Entry<String, Versioned<byte[]>>> s) { | ||
425 | + return new MappingSet<>(s, Collections::unmodifiableSet, | ||
426 | + this::reverseMapRawEntry, this::mapRawEntry); | ||
427 | + } | ||
428 | + | ||
416 | private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) { | 429 | private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) { |
417 | return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode)); | 430 | return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode)); |
418 | } | 431 | } |
419 | 432 | ||
433 | + private Map.Entry<String, Versioned<byte[]>> reverseMapRawEntry(Map.Entry<K, Versioned<V>> e) { | ||
434 | + return Maps.immutableEntry(sK(e.getKey()), e.getValue().map(serializer::encode)); | ||
435 | + } | ||
436 | + | ||
420 | private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key, | 437 | private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key, |
421 | Match<V> oldValueMatch, | 438 | Match<V> oldValueMatch, |
422 | Match<Long> oldVersionMatch, | 439 | Match<Long> oldVersionMatch, |
423 | V value) { | 440 | V value) { |
424 | beforeUpdate(key); | 441 | beforeUpdate(key); |
425 | return database.mapUpdate(name, | 442 | return database.mapUpdate(name, |
426 | - keyCache.getUnchecked(key), | 443 | + sK(key), |
427 | oldValueMatch.map(serializer::encode), | 444 | oldValueMatch.map(serializer::encode), |
428 | oldVersionMatch, | 445 | oldVersionMatch, |
429 | value == null ? null : serializer.encode(value)) | 446 | value == null ? null : serializer.encode(value)) | ... | ... |
1 | +/* | ||
2 | + * Copyright 2015 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.consistent.impl; | ||
17 | + | ||
18 | +import java.util.Arrays; | ||
19 | +import java.util.Collection; | ||
20 | +import java.util.Iterator; | ||
21 | +import java.util.Set; | ||
22 | +import java.util.function.Function; | ||
23 | +import java.util.stream.Collectors; | ||
24 | + | ||
25 | +import com.google.common.collect.Iterators; | ||
26 | + | ||
27 | +/** | ||
28 | + * Set view backed by Set with element type {@code <BACK>} but returns | ||
29 | + * element as {@code <OUT>} for convenience. | ||
30 | + * | ||
31 | + * @param <BACK> Backing {@link Set} element type. | ||
32 | + * MappingSet will follow this type's equality behavior. | ||
33 | + * @param <OUT> external facing element type. | ||
34 | + * MappingSet will ignores equality defined by this type. | ||
35 | + */ | ||
36 | +class MappingSet<BACK, OUT> implements Set<OUT> { | ||
37 | + | ||
38 | + private final Set<BACK> backedSet; | ||
39 | + private final Function<OUT, BACK> toBack; | ||
40 | + private final Function<BACK, OUT> toOut; | ||
41 | + | ||
42 | + public MappingSet(Set<BACK> backedSet, | ||
43 | + Function<Set<BACK>, Set<BACK>> supplier, | ||
44 | + Function<OUT, BACK> toBack, Function<BACK, OUT> toOut) { | ||
45 | + this.backedSet = supplier.apply(backedSet); | ||
46 | + this.toBack = toBack; | ||
47 | + this.toOut = toOut; | ||
48 | + } | ||
49 | + | ||
50 | + @Override | ||
51 | + public int size() { | ||
52 | + return backedSet.size(); | ||
53 | + } | ||
54 | + | ||
55 | + @Override | ||
56 | + public boolean isEmpty() { | ||
57 | + return backedSet.isEmpty(); | ||
58 | + } | ||
59 | + | ||
60 | + @Override | ||
61 | + public boolean contains(Object o) { | ||
62 | + return backedSet.contains(toBack.apply((OUT) o)); | ||
63 | + } | ||
64 | + | ||
65 | + @Override | ||
66 | + public Iterator<OUT> iterator() { | ||
67 | + return Iterators.transform(backedSet.iterator(), toOut::apply); | ||
68 | + } | ||
69 | + | ||
70 | + @Override | ||
71 | + public Object[] toArray() { | ||
72 | + return backedSet.stream() | ||
73 | + .map(toOut) | ||
74 | + .toArray(); | ||
75 | + } | ||
76 | + | ||
77 | + @Override | ||
78 | + public <T> T[] toArray(T[] a) { | ||
79 | + return backedSet.stream() | ||
80 | + .map(toOut) | ||
81 | + .toArray(size -> { | ||
82 | + if (size < a.length) { | ||
83 | + return (T[]) new Object[size]; | ||
84 | + } else { | ||
85 | + Arrays.fill(a, null); | ||
86 | + return a; | ||
87 | + } | ||
88 | + }); | ||
89 | + } | ||
90 | + | ||
91 | + @Override | ||
92 | + public boolean add(OUT e) { | ||
93 | + return backedSet.add(toBack.apply(e)); | ||
94 | + } | ||
95 | + | ||
96 | + @Override | ||
97 | + public boolean remove(Object o) { | ||
98 | + return backedSet.remove(toBack.apply((OUT) o)); | ||
99 | + } | ||
100 | + | ||
101 | + @Override | ||
102 | + public boolean containsAll(Collection<?> c) { | ||
103 | + return c.stream() | ||
104 | + .map(e -> toBack.apply((OUT) e)) | ||
105 | + .allMatch(backedSet::contains); | ||
106 | + } | ||
107 | + | ||
108 | + @Override | ||
109 | + public boolean addAll(Collection<? extends OUT> c) { | ||
110 | + return backedSet.addAll(c.stream().map(toBack).collect(Collectors.toList())); | ||
111 | + } | ||
112 | + | ||
113 | + @Override | ||
114 | + public boolean retainAll(Collection<?> c) { | ||
115 | + return backedSet.retainAll(c.stream() | ||
116 | + .map(x -> toBack.apply((OUT) x)) | ||
117 | + .collect(Collectors.toList())); | ||
118 | + } | ||
119 | + | ||
120 | + @Override | ||
121 | + public boolean removeAll(Collection<?> c) { | ||
122 | + return backedSet.removeAll(c.stream() | ||
123 | + .map(x -> toBack.apply((OUT) x)) | ||
124 | + .collect(Collectors.toList())); | ||
125 | + } | ||
126 | + | ||
127 | + @Override | ||
128 | + public void clear() { | ||
129 | + backedSet.clear(); | ||
130 | + } | ||
131 | +} |
This diff is collapsed. Click to expand it.
-
Please register or login to post a comment