Madan Jampani
Committed by Gerrit Code Review

Decorators for AsyncConsistentMap

Change-Id: Ie5f325ecb825951456bd950055ba88bb93af01b6
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.impl;
18 +
19 +import java.util.concurrent.CompletableFuture;
20 +import java.util.function.BiFunction;
21 +import java.util.function.Predicate;
22 +
23 +import org.onosproject.store.service.AsyncConsistentMap;
24 +import org.onosproject.store.service.Versioned;
25 +
26 +import com.google.common.cache.CacheBuilder;
27 +import com.google.common.cache.CacheLoader;
28 +import com.google.common.cache.LoadingCache;
29 +
30 +/**
31 + * {@code AsyncConsistentMap} that caches entries on read.
32 + * <p>
33 + * The cache entries are automatically invalidated when updates are detected either locally or
34 + * remotely.
35 + * <p> This implementation only attempts to serve cached entries for {@link AsyncConsistentMap#get get}
36 + * calls. All other calls skip the cache and directly go the backing map.
37 + *
38 + * @param <K> key type
39 + * @param <V> value type
40 + */
41 +public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
42 +
43 + private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache =
44 + CacheBuilder.newBuilder()
45 + .maximumSize(10000) // TODO: make configurable
46 + .build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() {
47 + @Override
48 + public CompletableFuture<Versioned<V>> load(K key)
49 + throws Exception {
50 + return CachingAsyncConsistentMap.super.get(key);
51 + }
52 + });
53 +
54 + public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
55 + super(backingMap);
56 + super.addListener(event -> cache.invalidate(event.key()));
57 + }
58 +
59 + @Override
60 + public CompletableFuture<Versioned<V>> get(K key) {
61 + return cache.getUnchecked(key);
62 + }
63 +
64 + @Override
65 + public CompletableFuture<Versioned<V>> computeIf(K key,
66 + Predicate<? super V> condition,
67 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
68 + return super.computeIf(key, condition, remappingFunction)
69 + .whenComplete((r, e) -> cache.invalidate(key));
70 + }
71 +
72 + @Override
73 + public CompletableFuture<Versioned<V>> put(K key, V value) {
74 + return super.put(key, value)
75 + .whenComplete((r, e) -> cache.invalidate(key));
76 + }
77 +
78 + @Override
79 + public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
80 + return super.put(key, value)
81 + .whenComplete((r, e) -> cache.invalidate(key));
82 + }
83 +
84 + @Override
85 + public CompletableFuture<Versioned<V>> remove(K key) {
86 + return super.remove(key)
87 + .whenComplete((r, e) -> cache.invalidate(key));
88 + }
89 +
90 + @Override
91 + public CompletableFuture<Void> clear() {
92 + return super.clear()
93 + .whenComplete((r, e) -> cache.invalidateAll());
94 + }
95 +
96 + @Override
97 + public CompletableFuture<Boolean> remove(K key, V value) {
98 + return super.remove(key, value)
99 + .whenComplete((r, e) -> {
100 + if (r) {
101 + cache.invalidate(key);
102 + }
103 + });
104 + }
105 +
106 + @Override
107 + public CompletableFuture<Boolean> remove(K key, long version) {
108 + return super.remove(key, version)
109 + .whenComplete((r, e) -> {
110 + if (r) {
111 + cache.invalidate(key);
112 + }
113 + });
114 + }
115 +
116 + @Override
117 + public CompletableFuture<Versioned<V>> replace(K key, V value) {
118 + return super.replace(key, value)
119 + .whenComplete((r, e) -> cache.invalidate(key));
120 + }
121 +
122 + @Override
123 + public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
124 + return super.replace(key, oldValue, newValue)
125 + .whenComplete((r, e) -> {
126 + if (r) {
127 + cache.invalidate(key);
128 + }
129 + });
130 + }
131 +
132 + @Override
133 + public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
134 + return super.replace(key, oldVersion, newValue)
135 + .whenComplete((r, e) -> {
136 + if (r) {
137 + cache.invalidate(key);
138 + }
139 + });
140 + }
141 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.impl;
18 +
19 +import static com.google.common.base.Preconditions.checkNotNull;
20 +
21 +import java.util.Collection;
22 +import java.util.Map.Entry;
23 +import java.util.Objects;
24 +import java.util.Set;
25 +import java.util.concurrent.CompletableFuture;
26 +import java.util.function.BiFunction;
27 +import java.util.function.Predicate;
28 +
29 +import org.onosproject.core.ApplicationId;
30 +import org.onosproject.store.service.AsyncConsistentMap;
31 +import org.onosproject.store.service.MapEventListener;
32 +import org.onosproject.store.service.Versioned;
33 +
34 +import com.google.common.base.MoreObjects;
35 +
36 +/**
37 + * {@code AsyncConsistentMap} that merely delegates control to
38 + * another AsyncConsistentMap.
39 + *
40 + * @param <K> key type
41 + * @param <V> value type
42 + */
43 +public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
44 +
45 + private final AsyncConsistentMap<K, V> delegateMap;
46 +
47 + DelegatingAsyncConsistentMap(AsyncConsistentMap<K, V> delegateMap) {
48 + this.delegateMap = checkNotNull(delegateMap, "delegate map cannot be null");
49 + }
50 +
51 + @Override
52 + public String name() {
53 + return delegateMap.name();
54 + }
55 +
56 + @Override
57 + public ApplicationId applicationId() {
58 + return delegateMap.applicationId();
59 + }
60 +
61 + @Override
62 + public CompletableFuture<Integer> size() {
63 + return delegateMap.size();
64 + }
65 +
66 + @Override
67 + public CompletableFuture<Boolean> containsKey(K key) {
68 + return delegateMap.containsKey(key);
69 + }
70 +
71 + @Override
72 + public CompletableFuture<Boolean> containsValue(V value) {
73 + return delegateMap.containsValue(value);
74 + }
75 +
76 + @Override
77 + public CompletableFuture<Versioned<V>> get(K key) {
78 + return delegateMap.get(key);
79 + }
80 +
81 + @Override
82 + public CompletableFuture<Versioned<V>> computeIf(K key,
83 + Predicate<? super V> condition,
84 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
85 + return delegateMap.computeIf(key, condition, remappingFunction);
86 + }
87 +
88 + @Override
89 + public CompletableFuture<Versioned<V>> put(K key, V value) {
90 + return delegateMap.put(key, value);
91 + }
92 +
93 + @Override
94 + public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
95 + return delegateMap.putAndGet(key, value);
96 + }
97 +
98 + @Override
99 + public CompletableFuture<Versioned<V>> remove(K key) {
100 + return delegateMap.remove(key);
101 + }
102 +
103 + @Override
104 + public CompletableFuture<Void> clear() {
105 + return delegateMap.clear();
106 + }
107 +
108 + @Override
109 + public CompletableFuture<Set<K>> keySet() {
110 + return delegateMap.keySet();
111 + }
112 +
113 + @Override
114 + public CompletableFuture<Collection<Versioned<V>>> values() {
115 + return delegateMap.values();
116 + }
117 +
118 + @Override
119 + public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
120 + return delegateMap.entrySet();
121 + }
122 +
123 + @Override
124 + public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
125 + return delegateMap.putIfAbsent(key, value);
126 + }
127 +
128 + @Override
129 + public CompletableFuture<Boolean> remove(K key, V value) {
130 + return delegateMap.remove(key, value);
131 + }
132 +
133 + @Override
134 + public CompletableFuture<Boolean> remove(K key, long version) {
135 + return delegateMap.remove(key, version);
136 + }
137 +
138 + @Override
139 + public CompletableFuture<Versioned<V>> replace(K key, V value) {
140 + return delegateMap.replace(key, value);
141 + }
142 +
143 + @Override
144 + public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
145 + return delegateMap.replace(key, oldValue, newValue);
146 + }
147 +
148 + @Override
149 + public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
150 + return delegateMap.replace(key, oldVersion, newValue);
151 + }
152 +
153 + @Override
154 + public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
155 + return delegateMap.addListener(listener);
156 + }
157 +
158 + @Override
159 + public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
160 + return delegateMap.removeListener(listener);
161 + }
162 +
163 + @Override
164 + public String toString() {
165 + return MoreObjects.toStringHelper(getClass())
166 + .add("delegateMap", delegateMap)
167 + .toString();
168 + }
169 +
170 + @Override
171 + public int hashCode() {
172 + return Objects.hash(delegateMap);
173 + }
174 +
175 + @Override
176 + public boolean equals(Object other) {
177 + if (other instanceof DelegatingAsyncConsistentMap) {
178 + DelegatingAsyncConsistentMap<K, V> that = (DelegatingAsyncConsistentMap) other;
179 + return this.delegateMap.equals(that.delegateMap);
180 + }
181 + return false;
182 + }
183 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import org.onosproject.cluster.PartitionId;
19 +
20 +/**
21 + * Interface for mapping from an object to {@link PartitionId}.
22 + *
23 + * @param <K> object type.
24 + */
25 +public interface Hasher<K> {
26 + /**
27 + * Returns the {@link PartitionId} to which the specified object maps.
28 + * @param object object
29 + * @return partition identifier
30 + */
31 + PartitionId hash(K object);
32 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.impl;
18 +
19 +import java.util.Collection;
20 +import java.util.Map;
21 +import java.util.Map.Entry;
22 +import java.util.Set;
23 +import java.util.concurrent.CompletableFuture;
24 +import java.util.function.BiFunction;
25 +import java.util.function.Function;
26 +import java.util.function.Predicate;
27 +
28 +import org.onosproject.store.service.AsyncConsistentMap;
29 +import org.onosproject.store.service.MapEvent;
30 +import org.onosproject.store.service.MapEventListener;
31 +import org.onosproject.store.service.Versioned;
32 +
33 +import com.google.common.base.Throwables;
34 +import com.google.common.collect.Maps;
35 +
36 +/**
37 + * {@link AsyncConsistentMap} that meters all its operations.
38 + *
39 + * @param <K> key type
40 + * @param <V> value type
41 + */
42 +public class MeteredAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
43 +
44 + private static final String PRIMITIVE_NAME = "consistentMap";
45 + private static final String SIZE = "size";
46 + private static final String IS_EMPTY = "isEmpty";
47 + private static final String CONTAINS_KEY = "containsKey";
48 + private static final String CONTAINS_VALUE = "containsValue";
49 + private static final String GET = "get";
50 + private static final String COMPUTE_IF = "computeIf";
51 + private static final String PUT = "put";
52 + private static final String PUT_AND_GET = "putAndGet";
53 + private static final String PUT_IF_ABSENT = "putIfAbsent";
54 + private static final String REMOVE = "remove";
55 + private static final String CLEAR = "clear";
56 + private static final String KEY_SET = "keySet";
57 + private static final String VALUES = "values";
58 + private static final String ENTRY_SET = "entrySet";
59 + private static final String REPLACE = "replace";
60 + private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
61 + private static final String ADD_LISTENER = "addListener";
62 + private static final String REMOVE_LISTENER = "removeListener";
63 + private static final String NOTIFY_LISTENER = "notifyListener";
64 +
65 + private final Map<MapEventListener<K, V>, InternalMeteredMapEventListener> listeners =
66 + Maps.newIdentityHashMap();
67 + private final MeteringAgent monitor;
68 +
69 + public MeteredAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
70 + super(backingMap);
71 + this.monitor = new MeteringAgent(PRIMITIVE_NAME, backingMap.name(), true);
72 + }
73 +
74 + @Override
75 + public CompletableFuture<Integer> size() {
76 + final MeteringAgent.Context timer = monitor.startTimer(SIZE);
77 + return super.size()
78 + .whenComplete((r, e) -> timer.stop(e));
79 + }
80 +
81 + @Override
82 + public CompletableFuture<Boolean> isEmpty() {
83 + final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
84 + return super.isEmpty()
85 + .whenComplete((r, e) -> timer.stop(e));
86 + }
87 +
88 + @Override
89 + public CompletableFuture<Boolean> containsKey(K key) {
90 + final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
91 + return super.containsKey(key)
92 + .whenComplete((r, e) -> timer.stop(e));
93 + }
94 +
95 + @Override
96 + public CompletableFuture<Boolean> containsValue(V value) {
97 + final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE);
98 + return super.containsValue(value)
99 + .whenComplete((r, e) -> timer.stop(e));
100 + }
101 +
102 + @Override
103 + public CompletableFuture<Versioned<V>> get(K key) {
104 + final MeteringAgent.Context timer = monitor.startTimer(GET);
105 + return super.get(key)
106 + .whenComplete((r, e) -> timer.stop(e));
107 + }
108 +
109 + @Override
110 + public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
111 + Function<? super K, ? extends V> mappingFunction) {
112 + final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT);
113 + return super.computeIfAbsent(key, mappingFunction)
114 + .whenComplete((r, e) -> timer.stop(e));
115 + }
116 +
117 + @Override
118 + public CompletableFuture<Versioned<V>> computeIf(K key,
119 + Predicate<? super V> condition,
120 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
121 + final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF);
122 + return super.computeIf(key, condition, remappingFunction)
123 + .whenComplete((r, e) -> timer.stop(e));
124 + }
125 +
126 + @Override
127 + public CompletableFuture<Versioned<V>> put(K key, V value) {
128 + final MeteringAgent.Context timer = monitor.startTimer(PUT);
129 + return super.put(key, value)
130 + .whenComplete((r, e) -> timer.stop(e));
131 + }
132 +
133 + @Override
134 + public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
135 + final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET);
136 + return super.putAndGet(key, value)
137 + .whenComplete((r, e) -> timer.stop(e));
138 + }
139 +
140 + @Override
141 + public CompletableFuture<Versioned<V>> remove(K key) {
142 + final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
143 + return super.remove(key)
144 + .whenComplete((r, e) -> timer.stop(e));
145 + }
146 +
147 + @Override
148 + public CompletableFuture<Void> clear() {
149 + final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
150 + return super.clear()
151 + .whenComplete((r, e) -> timer.stop(e));
152 + }
153 +
154 + @Override
155 + public CompletableFuture<Set<K>> keySet() {
156 + final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
157 + return super.keySet()
158 + .whenComplete((r, e) -> timer.stop(e));
159 + }
160 +
161 + @Override
162 + public CompletableFuture<Collection<Versioned<V>>> values() {
163 + final MeteringAgent.Context timer = monitor.startTimer(VALUES);
164 + return super.values()
165 + .whenComplete((r, e) -> timer.stop(e));
166 + }
167 +
168 + @Override
169 + public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
170 + final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
171 + return super.entrySet()
172 + .whenComplete((r, e) -> timer.stop(e));
173 + }
174 +
175 + @Override
176 + public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
177 + final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT);
178 + return super.putIfAbsent(key, value)
179 + .whenComplete((r, e) -> timer.stop(e));
180 + }
181 +
182 + @Override
183 + public CompletableFuture<Boolean> remove(K key, V value) {
184 + final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
185 + return super.remove(key, value)
186 + .whenComplete((r, e) -> timer.stop(e));
187 +
188 + }
189 +
190 + @Override
191 + public CompletableFuture<Boolean> remove(K key, long version) {
192 + final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
193 + return super.remove(key, version)
194 + .whenComplete((r, e) -> timer.stop(e));
195 + }
196 +
197 + @Override
198 + public CompletableFuture<Versioned<V>> replace(K key, V value) {
199 + final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
200 + return super.replace(key, value)
201 + .whenComplete((r, e) -> timer.stop(e));
202 + }
203 +
204 + @Override
205 + public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
206 + final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
207 + return super.replace(key, oldValue, newValue)
208 + .whenComplete((r, e) -> timer.stop(e));
209 + }
210 +
211 + @Override
212 + public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
213 + final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
214 + return super.replace(key, oldVersion, newValue)
215 + .whenComplete((r, e) -> timer.stop(e));
216 + }
217 +
218 + @Override
219 + public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
220 + final MeteringAgent.Context timer = monitor.startTimer(ADD_LISTENER);
221 + synchronized (listeners) {
222 + InternalMeteredMapEventListener meteredListener =
223 + listeners.computeIfAbsent(listener, k -> new InternalMeteredMapEventListener(listener));
224 + return super.addListener(meteredListener)
225 + .whenComplete((r, e) -> timer.stop(e));
226 + }
227 + }
228 +
229 + @Override
230 + public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
231 + final MeteringAgent.Context timer = monitor.startTimer(REMOVE_LISTENER);
232 + InternalMeteredMapEventListener meteredListener = listeners.remove(listener);
233 + if (meteredListener != null) {
234 + return super.removeListener(listener)
235 + .whenComplete((r, e) -> timer.stop(e));
236 + } else {
237 + timer.stop(null);
238 + return CompletableFuture.completedFuture(null);
239 + }
240 + }
241 +
242 + private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
243 +
244 + private final MapEventListener<K, V> listener;
245 +
246 + InternalMeteredMapEventListener(MapEventListener<K, V> listener) {
247 + this.listener = listener;
248 + }
249 +
250 + @Override
251 + public void event(MapEvent<K, V> event) {
252 + final MeteringAgent.Context timer = monitor.startTimer(NOTIFY_LISTENER);
253 + try {
254 + listener.event(event);
255 + timer.stop(null);
256 + } catch (Exception e) {
257 + timer.stop(e);
258 + Throwables.propagate(e);
259 + }
260 + }
261 + }
262 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import static com.google.common.base.Preconditions.checkNotNull;
19 +
20 +import java.util.Collection;
21 +import java.util.List;
22 +import java.util.Map;
23 +import java.util.Map.Entry;
24 +import java.util.Set;
25 +import java.util.TreeMap;
26 +import java.util.concurrent.CompletableFuture;
27 +import java.util.concurrent.atomic.AtomicBoolean;
28 +import java.util.concurrent.atomic.AtomicInteger;
29 +import java.util.function.BiFunction;
30 +import java.util.function.Predicate;
31 +
32 +import org.onosproject.cluster.PartitionId;
33 +import org.onosproject.store.service.AsyncConsistentMap;
34 +import org.onosproject.store.service.MapEventListener;
35 +import org.onosproject.store.service.Versioned;
36 +
37 +import com.google.common.collect.Lists;
38 +import com.google.common.collect.Maps;
39 +import com.google.common.collect.Sets;
40 +
41 +/**
42 + * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
43 + * several {@link AsyncConsistentMap maps}.
44 + *
45 + * @param <K> key type
46 + * @param <V> value type
47 + */
48 +public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
49 +
50 + private final String name;
51 + private final TreeMap<PartitionId, AsyncConsistentMap<K, V>> partitions = Maps.newTreeMap();
52 + private final Hasher<K> keyHasher;
53 +
54 + public PartitionedAsyncConsistentMap(String name,
55 + Map<PartitionId, AsyncConsistentMap<K, V>> partitions,
56 + Hasher<K> keyHasher) {
57 + this.name = name;
58 + this.partitions.putAll(checkNotNull(partitions));
59 + this.keyHasher = checkNotNull(keyHasher);
60 + }
61 +
62 + @Override
63 + public String name() {
64 + return name;
65 + }
66 +
67 + @Override
68 + public CompletableFuture<Integer> size() {
69 + AtomicInteger totalSize = new AtomicInteger(0);
70 + return CompletableFuture.allOf(getMaps()
71 + .stream()
72 + .map(map -> map.size().thenAccept(totalSize::addAndGet))
73 + .toArray(CompletableFuture[]::new))
74 + .thenApply(v -> totalSize.get());
75 + }
76 +
77 + @Override
78 + public CompletableFuture<Boolean> isEmpty() {
79 + return size().thenApply(size -> size == 0);
80 + }
81 +
82 + @Override
83 + public CompletableFuture<Boolean> containsKey(K key) {
84 + return getMap(key).containsKey(key);
85 + }
86 +
87 + @Override
88 + public CompletableFuture<Boolean> containsValue(V value) {
89 + AtomicBoolean contains = new AtomicBoolean(false);
90 + return CompletableFuture.allOf(getMaps().stream()
91 + .map(map -> map.containsValue(value)
92 + .thenAccept(v -> contains.set(contains.get() || v)))
93 + .toArray(CompletableFuture[]::new))
94 + .thenApply(v -> contains.get());
95 + }
96 + @Override
97 + public CompletableFuture<Versioned<V>> get(K key) {
98 + return getMap(key).get(key);
99 + }
100 +
101 + @Override
102 + public CompletableFuture<Versioned<V>> computeIf(K key,
103 + Predicate<? super V> condition,
104 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
105 + return getMap(key).computeIf(key, condition, remappingFunction);
106 + }
107 +
108 + @Override
109 + public CompletableFuture<Versioned<V>> put(K key, V value) {
110 + return getMap(key).put(key, value);
111 + }
112 +
113 + @Override
114 + public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
115 + return getMap(key).putAndGet(key, value);
116 + }
117 +
118 + @Override
119 + public CompletableFuture<Versioned<V>> remove(K key) {
120 + return getMap(key).remove(key);
121 + }
122 +
123 + @Override
124 + public CompletableFuture<Void> clear() {
125 + return CompletableFuture.allOf(getMaps().stream()
126 + .map(map -> map.clear())
127 + .toArray(CompletableFuture[]::new));
128 + }
129 +
130 + @Override
131 + public CompletableFuture<Set<K>> keySet() {
132 + Set<K> allKeys = Sets.newConcurrentHashSet();
133 + return CompletableFuture.allOf(getMaps().stream()
134 + .map(map -> map.keySet().thenAccept(allKeys::addAll))
135 + .toArray(CompletableFuture[]::new))
136 + .thenApply(v -> allKeys);
137 + }
138 +
139 + @Override
140 + public CompletableFuture<Collection<Versioned<V>>> values() {
141 + List<Versioned<V>> allValues = Lists.newCopyOnWriteArrayList();
142 + return CompletableFuture.allOf(getMaps().stream()
143 + .map(map -> map.values().thenAccept(allValues::addAll))
144 + .toArray(CompletableFuture[]::new))
145 + .thenApply(v -> allValues);
146 + }
147 +
148 + @Override
149 + public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
150 + Set<Entry<K, Versioned<V>>> allEntries = Sets.newConcurrentHashSet();
151 + return CompletableFuture.allOf(getMaps().stream()
152 + .map(map -> map.entrySet().thenAccept(allEntries::addAll))
153 + .toArray(CompletableFuture[]::new))
154 + .thenApply(v -> allEntries);
155 + }
156 +
157 + @Override
158 + public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
159 + return getMap(key).putIfAbsent(key, value);
160 + }
161 +
162 + @Override
163 + public CompletableFuture<Boolean> remove(K key, V value) {
164 + return getMap(key).remove(key, value);
165 + }
166 +
167 + @Override
168 + public CompletableFuture<Boolean> remove(K key, long version) {
169 + return getMap(key).remove(key, version);
170 + }
171 +
172 + @Override
173 + public CompletableFuture<Versioned<V>> replace(K key, V value) {
174 + return getMap(key).replace(key, value);
175 + }
176 +
177 + @Override
178 + public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
179 + return getMap(key).replace(key, oldValue, newValue);
180 + }
181 +
182 + @Override
183 + public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
184 + return getMap(key).replace(key, oldVersion, newValue);
185 + }
186 +
187 + @Override
188 + public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
189 + return CompletableFuture.allOf(getMaps().stream()
190 + .map(map -> map.addListener(listener))
191 + .toArray(CompletableFuture[]::new));
192 + }
193 +
194 + @Override
195 + public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
196 + return CompletableFuture.allOf(getMaps().stream()
197 + .map(map -> map.removeListener(listener))
198 + .toArray(CompletableFuture[]::new));
199 + }
200 +
201 + /**
202 + * Returns the map (partition) to which the specified key maps.
203 + * @param key key
204 + * @return AsyncConsistentMap to which key maps
205 + */
206 + private AsyncConsistentMap<K, V> getMap(K key) {
207 + return partitions.get(keyHasher.hash(key));
208 + }
209 +
210 + /**
211 + * Returns all the constituent maps.
212 + * @return collection of maps.
213 + */
214 + private Collection<AsyncConsistentMap<K, V>> getMaps() {
215 + return partitions.values();
216 + }
217 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.impl;
18 +
19 +import java.util.concurrent.CompletableFuture;
20 +import java.util.function.BiFunction;
21 +import java.util.function.Predicate;
22 +
23 +import org.onlab.util.Tools;
24 +import org.onosproject.store.service.AsyncConsistentMap;
25 +import org.onosproject.store.service.Versioned;
26 +
27 +/**
28 + * An unmodifiable {@link AsyncConsistentMap}.
29 + * <p>
30 + * Any attempt to update the map through this instance will cause the
31 + * operation to be completed with an {@link UnsupportedOperationException}.
32 + *
33 + * @param <K> key type
34 + * @param <V> value type
35 + */
36 +public class UnmodifiableAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
37 +
38 + public UnmodifiableAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
39 + super(backingMap);
40 + }
41 +
42 + @Override
43 + public CompletableFuture<Versioned<V>> computeIf(K key,
44 + Predicate<? super V> condition,
45 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
46 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
47 + }
48 +
49 + @Override
50 + public CompletableFuture<Versioned<V>> put(K key, V value) {
51 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
52 + }
53 +
54 + @Override
55 + public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
56 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
57 + }
58 +
59 + @Override
60 + public CompletableFuture<Versioned<V>> remove(K key) {
61 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
62 + }
63 +
64 + @Override
65 + public CompletableFuture<Void> clear() {
66 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
67 + }
68 +
69 + @Override
70 + public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
71 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
72 + }
73 +
74 + @Override
75 + public CompletableFuture<Boolean> remove(K key, V value) {
76 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
77 + }
78 +
79 + @Override
80 + public CompletableFuture<Boolean> remove(K key, long version) {
81 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
82 + }
83 +
84 + @Override
85 + public CompletableFuture<Versioned<V>> replace(K key, V value) {
86 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
87 + }
88 +
89 + @Override
90 + public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
91 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
92 + }
93 +
94 + @Override
95 + public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
96 + return Tools.exceptionalFuture(new UnsupportedOperationException("map updates are not allowed"));
97 + }
98 +}