Committed by
Gerrit Code Review
DistributedPrimitives updates:
Adds a ferderated distributed primitive creator Adds a DistributedPrimitives utility class Adds a transcoding async consistent map for transcoding between map types Change-Id: I7bc30e4a8aee9d4286175d7081bbbd0f28b9928f
Showing
3 changed files
with
424 additions
and
0 deletions
core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedPrimitives.java
0 → 100644
1 | +/* | ||
2 | + * Copyright 2016 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.primitives.impl; | ||
17 | + | ||
18 | +import java.util.function.Function; | ||
19 | + | ||
20 | +import org.onosproject.store.service.AsyncConsistentMap; | ||
21 | +import org.onosproject.store.service.AsyncDistributedSet; | ||
22 | + | ||
23 | +/** | ||
24 | + * Misc utilities for working with {@code DistributedPrimitive}s. | ||
25 | + */ | ||
26 | +public final class DistributedPrimitives { | ||
27 | + | ||
28 | + private DistributedPrimitives() {} | ||
29 | + | ||
30 | + /** | ||
31 | + * Creates an instance of {@code AsyncDistributedSet} that is backed by a {@code AsyncConsistentMap}. | ||
32 | + * | ||
33 | + * @param map backing map | ||
34 | + * @return set | ||
35 | + * @param <E> set element type | ||
36 | + */ | ||
37 | + public static <E> AsyncDistributedSet<E> newSetFromMap(AsyncConsistentMap<E, Boolean> map) { | ||
38 | + return new DefaultAsyncDistributedSet<>(map, map.name(), true); | ||
39 | + } | ||
40 | + | ||
41 | + /** | ||
42 | + * Creates an instance of {@code AsyncConsistentMap} that records metrics for all its operations. | ||
43 | + * | ||
44 | + * @param map map whose operations are to be metered | ||
45 | + * @return metered map | ||
46 | + * @param <K> map key type | ||
47 | + * @param <V> map value type | ||
48 | + */ | ||
49 | + public static <K, V> AsyncConsistentMap<K, V> newMeteredMap(AsyncConsistentMap<K, V> map) { | ||
50 | + return new MeteredAsyncConsistentMap<>(map); | ||
51 | + } | ||
52 | + | ||
53 | + /** | ||
54 | + * Creates an instance of {@code AsyncConsistentMap} that caches entries on get. | ||
55 | + * | ||
56 | + * @param map backing map | ||
57 | + * @return caching map | ||
58 | + * @param <K> map key type | ||
59 | + * @param <V> map value type | ||
60 | + */ | ||
61 | + public static <K, V> AsyncConsistentMap<K, V> newCachingMap(AsyncConsistentMap<K, V> map) { | ||
62 | + return new CachingAsyncConsistentMap<>(map); | ||
63 | + } | ||
64 | + | ||
65 | + /** | ||
66 | + * Creates an instance of {@code AsyncConsistentMap} that disallows updates. | ||
67 | + * | ||
68 | + * @param map backing map | ||
69 | + * @return unmodifiable map | ||
70 | + * @param <K> map key type | ||
71 | + * @param <V> map value type | ||
72 | + */ | ||
73 | + public static <K, V> AsyncConsistentMap<K, V> newUnmodifiableMap(AsyncConsistentMap<K, V> map) { | ||
74 | + return new UnmodifiableAsyncConsistentMap<>(map); | ||
75 | + } | ||
76 | + | ||
77 | + /** | ||
78 | + * Creates an instance of {@code AsyncConsistentMap} that transforms operations inputs and applies them | ||
79 | + * to corresponding operation in a different typed map and returns the output after reverse transforming it. | ||
80 | + * | ||
81 | + * @param map backing map | ||
82 | + * @param keyEncoder transformer for key type of returned map to key type of input map | ||
83 | + * @param keyDecoder transformer for key type of input map to key type of returned map | ||
84 | + * @param valueEncoder transformer for value type of returned map to value type of input map | ||
85 | + * @param valueDecoder transformer for value type of input map to value type of returned map | ||
86 | + * @param <K1> returned map key type | ||
87 | + * @param <K2> input map key type | ||
88 | + * @param <V1> returned map value type | ||
89 | + * @param <V2> input map key type | ||
90 | + * @return new map | ||
91 | + */ | ||
92 | + public static <K1, V1, K2, V2> AsyncConsistentMap<K1, V1> newTranscodingMap(AsyncConsistentMap<K2, V2> map, | ||
93 | + Function<K1, K2> keyEncoder, | ||
94 | + Function<K2, K1> keyDecoder, | ||
95 | + Function<V1, V2> valueEncoder, | ||
96 | + Function<V2, V1> valueDecoder) { | ||
97 | + return new TranscodingAsyncConsistentMap<K1, V1, K2, V2>(map, | ||
98 | + keyEncoder, | ||
99 | + keyDecoder, | ||
100 | + valueEncoder, | ||
101 | + valueDecoder); | ||
102 | + } | ||
103 | +} |
1 | +/* | ||
2 | + * Copyright 2016 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.onosproject.store.primitives.impl; | ||
17 | + | ||
18 | +import static com.google.common.base.Preconditions.checkNotNull; | ||
19 | + | ||
20 | +import java.util.List; | ||
21 | +import java.util.Map; | ||
22 | +import java.util.TreeMap; | ||
23 | + | ||
24 | +import org.apache.commons.lang.StringUtils; | ||
25 | +import org.onlab.util.Tools; | ||
26 | +import org.onosproject.cluster.PartitionId; | ||
27 | +import org.onosproject.store.primitives.DistributedPrimitiveCreator; | ||
28 | +import org.onosproject.store.service.AsyncAtomicCounter; | ||
29 | +import org.onosproject.store.service.AsyncAtomicValue; | ||
30 | +import org.onosproject.store.service.AsyncConsistentMap; | ||
31 | +import org.onosproject.store.service.AsyncDistributedSet; | ||
32 | +import org.onosproject.store.service.AsyncLeaderElector; | ||
33 | +import org.onosproject.store.service.DistributedQueue; | ||
34 | +import org.onosproject.store.service.Serializer; | ||
35 | + | ||
36 | +import com.google.common.collect.Lists; | ||
37 | +import com.google.common.collect.Maps; | ||
38 | +import com.google.common.hash.HashCode; | ||
39 | +import com.google.common.hash.Hashing; | ||
40 | +import com.google.common.primitives.Bytes; | ||
41 | + | ||
42 | +/** | ||
43 | + * {@code DistributedPrimitiveCreator} that federates responsibility for creating | ||
44 | + * distributed primitives to a collection of other {@link DistributedPrimitiveCreator creators}. | ||
45 | + */ | ||
46 | +public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator { | ||
47 | + | ||
48 | + private final TreeMap<PartitionId, DistributedPrimitiveCreator> members; | ||
49 | + private final List<PartitionId> sortedMemberPartitionIds; | ||
50 | + | ||
51 | + public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) { | ||
52 | + this.members = Maps.newTreeMap(); | ||
53 | + this.members.putAll(checkNotNull(members)); | ||
54 | + this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet()); | ||
55 | + } | ||
56 | + | ||
57 | + @Override | ||
58 | + public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) { | ||
59 | + checkNotNull(name); | ||
60 | + checkNotNull(serializer); | ||
61 | + Map<PartitionId, AsyncConsistentMap<K, V>> maps = | ||
62 | + Maps.transformValues(members, | ||
63 | + partition -> partition.newAsyncConsistentMap(name, serializer)); | ||
64 | + Hasher<K> hasher = key -> { | ||
65 | + long hashCode = HashCode.fromBytes(Bytes.ensureCapacity(serializer.encode(key), 8, 0)).asLong(); | ||
66 | + return sortedMemberPartitionIds.get(Hashing.consistentHash(hashCode, members.size())); | ||
67 | + }; | ||
68 | + return new PartitionedAsyncConsistentMap<>(name, maps, hasher); | ||
69 | + } | ||
70 | + | ||
71 | + @Override | ||
72 | + public <E> AsyncDistributedSet<E> newAsyncDistributedSet(String name, Serializer serializer) { | ||
73 | + return DistributedPrimitives.newSetFromMap(newAsyncConsistentMap(name, serializer)); | ||
74 | + } | ||
75 | + | ||
76 | + @Override | ||
77 | + public AsyncAtomicCounter newAsyncCounter(String name) { | ||
78 | + return getCreator(name).newAsyncCounter(name); | ||
79 | + } | ||
80 | + | ||
81 | + @Override | ||
82 | + public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) { | ||
83 | + return getCreator(name).newAsyncAtomicValue(name, serializer); | ||
84 | + } | ||
85 | + | ||
86 | + @Override | ||
87 | + public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) { | ||
88 | + return getCreator(name).newDistributedQueue(name, serializer); | ||
89 | + } | ||
90 | + | ||
91 | + @Override | ||
92 | + public AsyncLeaderElector newAsyncLeaderElector(String name) { | ||
93 | + return getCreator(name).newAsyncLeaderElector(name); | ||
94 | + } | ||
95 | + | ||
96 | + /** | ||
97 | + * Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive. | ||
98 | + * @param name primitive name | ||
99 | + * @return primitive creator | ||
100 | + */ | ||
101 | + private DistributedPrimitiveCreator getCreator(String name) { | ||
102 | + long hashCode = HashCode.fromBytes(Tools.getBytesUtf8(StringUtils.leftPad(name, 8))).asLong(); | ||
103 | + int index = Hashing.consistentHash(hashCode, members.size()); | ||
104 | + return members.get(sortedMemberPartitionIds.get(index)); | ||
105 | + } | ||
106 | +} |
1 | +/* | ||
2 | + * Copyright 2016 Open Networking Laboratory | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | + | ||
17 | +package org.onosproject.store.primitives.impl; | ||
18 | + | ||
19 | +import java.util.Collection; | ||
20 | +import java.util.Map; | ||
21 | +import java.util.Map.Entry; | ||
22 | +import java.util.Set; | ||
23 | +import java.util.concurrent.CompletableFuture; | ||
24 | +import java.util.function.BiFunction; | ||
25 | +import java.util.function.Function; | ||
26 | +import java.util.function.Predicate; | ||
27 | +import java.util.stream.Collectors; | ||
28 | + | ||
29 | +import org.onosproject.store.service.AsyncConsistentMap; | ||
30 | +import org.onosproject.store.service.MapEvent; | ||
31 | +import org.onosproject.store.service.MapEventListener; | ||
32 | +import org.onosproject.store.service.Versioned; | ||
33 | + | ||
34 | +import com.google.common.collect.Maps; | ||
35 | + | ||
36 | +/** | ||
37 | + * An {@code AsyncConsistentMap} that maps its operations to operations on a | ||
38 | + * differently typed {@code AsyncConsistentMap} by transcoding operation inputs and outputs. | ||
39 | + * | ||
40 | + * @param <K2> key type of other map | ||
41 | + * @param <V2> value type of other map | ||
42 | + * @param <K1> key type of this map | ||
43 | + * @param <V1> value type of this map | ||
44 | + */ | ||
45 | +public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsistentMap<K1, V1> { | ||
46 | + | ||
47 | + private final AsyncConsistentMap<K2, V2> backingMap; | ||
48 | + private final Function<K1, K2> keyEncoder; | ||
49 | + private final Function<K2, K1> keyDecoder; | ||
50 | + private final Function<V2, V1> valueDecoder; | ||
51 | + private final Function<V1, V2> valueEncoder; | ||
52 | + private final Function<Versioned<V2>, Versioned<V1>> versionedValueTransform; | ||
53 | + private final Map<MapEventListener<K1, V1>, InternalBackingMapEventListener> listeners = | ||
54 | + Maps.newIdentityHashMap(); | ||
55 | + | ||
56 | + public TranscodingAsyncConsistentMap(AsyncConsistentMap<K2, V2> backingMap, | ||
57 | + Function<K1, K2> keyEncoder, | ||
58 | + Function<K2, K1> keyDecoder, | ||
59 | + Function<V1, V2> valueEncoder, | ||
60 | + Function<V2, V1> valueDecoder) { | ||
61 | + this.backingMap = backingMap; | ||
62 | + this.keyEncoder = k -> k == null ? null : keyEncoder.apply(k); | ||
63 | + this.keyDecoder = keyDecoder; | ||
64 | + this.valueEncoder = v -> v == null ? null : valueEncoder.apply(v); | ||
65 | + this.valueDecoder = valueDecoder; | ||
66 | + this.versionedValueTransform = v -> v == null ? null : v.map(valueDecoder); | ||
67 | + } | ||
68 | + | ||
69 | + @Override | ||
70 | + public String name() { | ||
71 | + return backingMap.name(); | ||
72 | + } | ||
73 | + | ||
74 | + @Override | ||
75 | + public CompletableFuture<Integer> size() { | ||
76 | + return backingMap.size(); | ||
77 | + } | ||
78 | + | ||
79 | + @Override | ||
80 | + public CompletableFuture<Boolean> containsKey(K1 key) { | ||
81 | + return backingMap.containsKey(keyEncoder.apply(key)); | ||
82 | + } | ||
83 | + | ||
84 | + @Override | ||
85 | + public CompletableFuture<Boolean> containsValue(V1 value) { | ||
86 | + return backingMap.containsValue(valueEncoder.apply(value)); | ||
87 | + } | ||
88 | + | ||
89 | + @Override | ||
90 | + public CompletableFuture<Versioned<V1>> get(K1 key) { | ||
91 | + return backingMap.get(keyEncoder.apply(key)).thenApply(versionedValueTransform); | ||
92 | + } | ||
93 | + | ||
94 | + @Override | ||
95 | + public CompletableFuture<Versioned<V1>> computeIf(K1 key, | ||
96 | + Predicate<? super V1> condition, | ||
97 | + BiFunction<? super K1, ? super V1, ? extends V1> remappingFunction) { | ||
98 | + return backingMap.computeIf(keyEncoder.apply(key), | ||
99 | + v -> condition.test(valueDecoder.apply(v)), | ||
100 | + (k, v) -> valueEncoder.apply(remappingFunction.apply(keyDecoder.apply(k), | ||
101 | + valueDecoder.apply(v)))) | ||
102 | + .thenApply(versionedValueTransform); | ||
103 | + } | ||
104 | + | ||
105 | + @Override | ||
106 | + public CompletableFuture<Versioned<V1>> put(K1 key, V1 value) { | ||
107 | + return backingMap.put(keyEncoder.apply(key), valueEncoder.apply(value)) | ||
108 | + .thenApply(versionedValueTransform); | ||
109 | + } | ||
110 | + | ||
111 | + @Override | ||
112 | + public CompletableFuture<Versioned<V1>> putAndGet(K1 key, V1 value) { | ||
113 | + return backingMap.putAndGet(keyEncoder.apply(key), valueEncoder.apply(value)) | ||
114 | + .thenApply(versionedValueTransform); | ||
115 | + } | ||
116 | + | ||
117 | + @Override | ||
118 | + public CompletableFuture<Versioned<V1>> remove(K1 key) { | ||
119 | + return backingMap.remove(keyEncoder.apply(key)).thenApply(versionedValueTransform); | ||
120 | + } | ||
121 | + | ||
122 | + @Override | ||
123 | + public CompletableFuture<Void> clear() { | ||
124 | + return backingMap.clear(); | ||
125 | + } | ||
126 | + | ||
127 | + @Override | ||
128 | + public CompletableFuture<Set<K1>> keySet() { | ||
129 | + return backingMap.keySet() | ||
130 | + .thenApply(s -> s.stream().map(keyDecoder).collect(Collectors.toSet())); | ||
131 | + } | ||
132 | + | ||
133 | + @Override | ||
134 | + public CompletableFuture<Collection<Versioned<V1>>> values() { | ||
135 | + return backingMap.values() | ||
136 | + .thenApply(c -> c.stream().map(versionedValueTransform).collect(Collectors.toList())); | ||
137 | + } | ||
138 | + | ||
139 | + @Override | ||
140 | + public CompletableFuture<Set<Entry<K1, Versioned<V1>>>> entrySet() { | ||
141 | + return backingMap.entrySet() | ||
142 | + .thenApply(s -> s.stream() | ||
143 | + .map(e -> Maps.immutableEntry(keyDecoder.apply(e.getKey()), | ||
144 | + versionedValueTransform.apply(e.getValue()))) | ||
145 | + .collect(Collectors.toSet())); | ||
146 | + } | ||
147 | + | ||
148 | + @Override | ||
149 | + public CompletableFuture<Versioned<V1>> putIfAbsent(K1 key, V1 value) { | ||
150 | + return backingMap.putIfAbsent(keyEncoder.apply(key), valueEncoder.apply(value)) | ||
151 | + .thenApply(versionedValueTransform); | ||
152 | + } | ||
153 | + | ||
154 | + @Override | ||
155 | + public CompletableFuture<Boolean> remove(K1 key, V1 value) { | ||
156 | + return backingMap.remove(keyEncoder.apply(key), valueEncoder.apply(value)); | ||
157 | + } | ||
158 | + | ||
159 | + @Override | ||
160 | + public CompletableFuture<Boolean> remove(K1 key, long version) { | ||
161 | + return backingMap.remove(keyEncoder.apply(key), version); | ||
162 | + } | ||
163 | + | ||
164 | + @Override | ||
165 | + public CompletableFuture<Versioned<V1>> replace(K1 key, V1 value) { | ||
166 | + return backingMap.replace(keyEncoder.apply(key), valueEncoder.apply(value)) | ||
167 | + .thenApply(versionedValueTransform); | ||
168 | + } | ||
169 | + | ||
170 | + @Override | ||
171 | + public CompletableFuture<Boolean> replace(K1 key, V1 oldValue, V1 newValue) { | ||
172 | + return backingMap.replace(keyEncoder.apply(key), valueEncoder.apply(oldValue), valueEncoder.apply(newValue)); | ||
173 | + } | ||
174 | + | ||
175 | + @Override | ||
176 | + public CompletableFuture<Boolean> replace(K1 key, long oldVersion, V1 newValue) { | ||
177 | + return backingMap.replace(keyEncoder.apply(key), oldVersion, valueEncoder.apply(newValue)); | ||
178 | + } | ||
179 | + | ||
180 | + @Override | ||
181 | + public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener) { | ||
182 | + synchronized (listeners) { | ||
183 | + InternalBackingMapEventListener backingMapListener = | ||
184 | + listeners.computeIfAbsent(listener, k -> new InternalBackingMapEventListener(listener)); | ||
185 | + return backingMap.addListener(backingMapListener); | ||
186 | + } | ||
187 | + } | ||
188 | + | ||
189 | + @Override | ||
190 | + public CompletableFuture<Void> removeListener(MapEventListener<K1, V1> listener) { | ||
191 | + InternalBackingMapEventListener backingMapListener = listeners.remove(listener); | ||
192 | + if (backingMapListener != null) { | ||
193 | + return backingMap.removeListener(backingMapListener); | ||
194 | + } else { | ||
195 | + return CompletableFuture.completedFuture(null); | ||
196 | + } | ||
197 | + } | ||
198 | + | ||
199 | + private class InternalBackingMapEventListener implements MapEventListener<K2, V2> { | ||
200 | + | ||
201 | + private final MapEventListener<K1, V1> listener; | ||
202 | + | ||
203 | + InternalBackingMapEventListener(MapEventListener<K1, V1> listener) { | ||
204 | + this.listener = listener; | ||
205 | + } | ||
206 | + | ||
207 | + @Override | ||
208 | + public void event(MapEvent<K2, V2> event) { | ||
209 | + listener.event(new MapEvent<K1, V1>(event.name(), | ||
210 | + keyDecoder.apply(event.key()), | ||
211 | + event.newValue().map(valueDecoder), | ||
212 | + event.oldValue().map(valueDecoder))); | ||
213 | + } | ||
214 | + } | ||
215 | +} |
-
Please register or login to post a comment