Aaron Kruglikov
Committed by Gerrit Code Review

Adding additional TreeMap resources

Change-Id: I103a8c5e6fb1c5e7a6ae0942e0b746367da18736
...@@ -243,8 +243,9 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive { ...@@ -243,8 +243,9 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
243 CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet(); 243 CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet();
244 244
245 /** 245 /**
246 - * If the specified key is not already associated with a value 246 + * If the specified key is not already associated with a value associates
247 - * associates it with the given value and returns null, else returns the current value. 247 + * it with the given value and returns null, else behaves as a get
248 + * returning the existing mapping without making any changes.
248 * 249 *
249 * @param key key with which the specified value is to be associated 250 * @param key key with which the specified value is to be associated
250 * @param value value to be associated with the specified key 251 * @param value value to be associated with the specified key
......
...@@ -19,6 +19,7 @@ package org.onosproject.store.service; ...@@ -19,6 +19,7 @@ package org.onosproject.store.service;
19 import org.onosproject.store.primitives.DefaultConsistentTreeMap; 19 import org.onosproject.store.primitives.DefaultConsistentTreeMap;
20 20
21 import java.util.Map; 21 import java.util.Map;
22 +import java.util.NavigableMap;
22 import java.util.NavigableSet; 23 import java.util.NavigableSet;
23 import java.util.concurrent.CompletableFuture; 24 import java.util.concurrent.CompletableFuture;
24 25
...@@ -42,26 +43,28 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> { ...@@ -42,26 +43,28 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> {
42 CompletableFuture<K> lastKey(); 43 CompletableFuture<K> lastKey();
43 44
44 /** 45 /**
45 - * Returns the entry associated with the least key greater than or equal to the key. 46 + * Returns the entry associated with the least key greater than or equal to
47 + * the key.
46 * 48 *
47 * @param key the key 49 * @param key the key
48 - * @return the entry or null 50 + * @return the entry or null if no suitable key exists
49 */ 51 */
50 CompletableFuture<Map.Entry<K, Versioned<V>>> ceilingEntry(K key); 52 CompletableFuture<Map.Entry<K, Versioned<V>>> ceilingEntry(K key);
51 53
52 /** 54 /**
53 - * Returns the entry associated with the greatest key less than or equal to key. 55 + * Returns the entry associated with the greatest key less than or equal
56 + * to key.
54 * 57 *
55 * @param key the key 58 * @param key the key
56 - * @return the entry or null 59 + * @return the entry or null if no suitable key exists
57 */ 60 */
58 CompletableFuture<Map.Entry<K, Versioned<V>>> floorEntry(K key); 61 CompletableFuture<Map.Entry<K, Versioned<V>>> floorEntry(K key);
59 62
60 /** 63 /**
61 - * Returns the entry associated with the lest key greater than key. 64 + * Returns the entry associated with the least key greater than key.
62 * 65 *
63 * @param key the key 66 * @param key the key
64 - * @return the entry or null 67 + * @return the entry or null if no suitable key exists
65 */ 68 */
66 CompletableFuture<Map.Entry<K, Versioned<V>>> higherEntry(K key); 69 CompletableFuture<Map.Entry<K, Versioned<V>>> higherEntry(K key);
67 70
...@@ -69,35 +72,35 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> { ...@@ -69,35 +72,35 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> {
69 * Returns the entry associated with the largest key less than key. 72 * Returns the entry associated with the largest key less than key.
70 * 73 *
71 * @param key the key 74 * @param key the key
72 - * @return the entry or null 75 + * @return the entry or null if no suitable key exists
73 */ 76 */
74 CompletableFuture<Map.Entry<K, Versioned<V>>> lowerEntry(K key); 77 CompletableFuture<Map.Entry<K, Versioned<V>>> lowerEntry(K key);
75 78
76 /** 79 /**
77 * Return the entry associated with the lowest key in the map. 80 * Return the entry associated with the lowest key in the map.
78 * 81 *
79 - * @return the entry or null 82 + * @return the entry or null if none exist
80 */ 83 */
81 CompletableFuture<Map.Entry<K, Versioned<V>>> firstEntry(); 84 CompletableFuture<Map.Entry<K, Versioned<V>>> firstEntry();
82 85
83 /** 86 /**
84 - * Return the entry assocaited with the highest key in the map. 87 + * Return the entry associated with the highest key in the map.
85 * 88 *
86 - * @return the entry or null 89 + * @return the entry or null if none exist
87 */ 90 */
88 CompletableFuture<Map.Entry<K, Versioned<V>>> lastEntry(); 91 CompletableFuture<Map.Entry<K, Versioned<V>>> lastEntry();
89 92
90 /** 93 /**
91 * Return and remove the entry associated with the lowest key. 94 * Return and remove the entry associated with the lowest key.
92 * 95 *
93 - * @return the entry or null 96 + * @return the entry or null if none exist
94 */ 97 */
95 CompletableFuture<Map.Entry<K, Versioned<V>>> pollFirstEntry(); 98 CompletableFuture<Map.Entry<K, Versioned<V>>> pollFirstEntry();
96 99
97 /** 100 /**
98 * Return and remove the entry associated with the highest key. 101 * Return and remove the entry associated with the highest key.
99 * 102 *
100 - * @return the entry or null 103 + * @return the entry or null if none exist
101 */ 104 */
102 CompletableFuture<Map.Entry<K, Versioned<V>>> pollLastEntry(); 105 CompletableFuture<Map.Entry<K, Versioned<V>>> pollLastEntry();
103 106
...@@ -105,15 +108,15 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> { ...@@ -105,15 +108,15 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> {
105 * Return the entry associated with the greatest key less than key. 108 * Return the entry associated with the greatest key less than key.
106 * 109 *
107 * @param key the key 110 * @param key the key
108 - * @return the entry or null 111 + * @return the entry or null if no suitable key exists
109 */ 112 */
110 CompletableFuture<K> lowerKey(K key); 113 CompletableFuture<K> lowerKey(K key);
111 114
112 /** 115 /**
113 - * Return the entry associated with the highest key less than or equal to key. 116 + * Return the highest key less than or equal to key.
114 * 117 *
115 * @param key the key 118 * @param key the key
116 - * @return the entry or null 119 + * @return the entry or null if no suitable key exists
117 */ 120 */
118 CompletableFuture<K> floorKey(K key); 121 CompletableFuture<K> floorKey(K key);
119 122
...@@ -121,7 +124,7 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> { ...@@ -121,7 +124,7 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> {
121 * Return the lowest key greater than or equal to key. 124 * Return the lowest key greater than or equal to key.
122 * 125 *
123 * @param key the key 126 * @param key the key
124 - * @return the key or null 127 + * @return the entry or null if no suitable key exists
125 */ 128 */
126 CompletableFuture<K> ceilingKey(K key); 129 CompletableFuture<K> ceilingKey(K key);
127 130
...@@ -129,17 +132,35 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> { ...@@ -129,17 +132,35 @@ public interface AsyncConsistentTreeMap<K, V> extends AsyncConsistentMap<K, V> {
129 * Return the lowest key greater than key. 132 * Return the lowest key greater than key.
130 * 133 *
131 * @param key the key 134 * @param key the key
132 - * @return the key or null 135 + * @return the entry or null if no suitable key exists
133 */ 136 */
134 CompletableFuture<K> higherKey(K key); 137 CompletableFuture<K> higherKey(K key);
135 138
136 /** 139 /**
137 * Returns a navigable set of the keys in this map. 140 * Returns a navigable set of the keys in this map.
138 * 141 *
139 - * @return a navigable key set 142 + * @return a navigable key set (this may be empty)
140 */ 143 */
141 CompletableFuture<NavigableSet<K>> navigableKeySet(); 144 CompletableFuture<NavigableSet<K>> navigableKeySet();
142 145
146 + /**
147 + * Returns a navigable map containing the entries from the original map
148 + * which are larger than (or if specified equal to) {@code lowerKey} AND
149 + * less than (or if specified equal to) {@code upperKey}.
150 + *
151 + * @param upperKey the upper bound for the keys in this map
152 + * @param lowerKey the lower bound for the keys in this map
153 + * @param inclusiveUpper whether keys equal to the upperKey should be
154 + * included
155 + * @param inclusiveLower whether keys equal to the lowerKey should be
156 + * included
157 + * @return a navigable map containing entries in the specified range (this
158 + * may be empty)
159 + */
160 + CompletableFuture<NavigableMap<K, V>> subMap(K upperKey, K lowerKey,
161 + boolean inclusiveUpper,
162 + boolean inclusiveLower);
163 +
143 default ConsistentTreeMap<K, V> asTreeMap() { 164 default ConsistentTreeMap<K, V> asTreeMap() {
144 return asTreeMap(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS); 165 return asTreeMap(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS);
145 } 166 }
......
...@@ -266,7 +266,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap> ...@@ -266,7 +266,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
266 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener, 266 public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
267 Executor executor) { 267 Executor executor) {
268 if (mapEventListeners.isEmpty()) { 268 if (mapEventListeners.isEmpty()) {
269 - return client.submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor)); 269 + return client.submit(new Listen()).thenRun(() -> mapEventListeners.put(listener, executor));
270 } else { 270 } else {
271 mapEventListeners.put(listener, executor); 271 mapEventListeners.put(listener, executor);
272 return CompletableFuture.completedFuture(null); 272 return CompletableFuture.completedFuture(null);
......
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.resources.impl;
18 +
19 +import com.google.common.collect.Maps;
20 +import io.atomix.copycat.client.CopycatClient;
21 +import io.atomix.resource.AbstractResource;
22 +import io.atomix.resource.ResourceTypeInfo;
23 +import org.onlab.util.Match;
24 +import org.onosproject.store.primitives.TransactionId;
25 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
26 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
27 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
28 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
29 +import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
30 +import org.onosproject.store.service.AsyncConsistentTreeMap;
31 +import org.onosproject.store.service.MapEvent;
32 +import org.onosproject.store.service.MapEventListener;
33 +import org.onosproject.store.service.MapTransaction;
34 +import org.onosproject.store.service.Versioned;
35 +
36 +import java.util.Collection;
37 +import java.util.ConcurrentModificationException;
38 +import java.util.List;
39 +import java.util.Map;
40 +import java.util.NavigableMap;
41 +import java.util.NavigableSet;
42 +import java.util.Properties;
43 +import java.util.Set;
44 +import java.util.concurrent.CompletableFuture;
45 +import java.util.concurrent.Executor;
46 +import java.util.concurrent.atomic.AtomicReference;
47 +import java.util.function.BiFunction;
48 +import java.util.function.Predicate;
49 +
50 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
51 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
52 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
53 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
54 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
55 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
56 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
57 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
58 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
59 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
60 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
61 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
62 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
63 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
64 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
65 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
66 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
67 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
68 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
69 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
70 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
71 +
72 +/**
73 + * Implementation of {@link AsyncConsistentTreeMap}.
74 + */
75 +@ResourceTypeInfo(id = -155, factory = AtomixConsistentTreeMapFactory.class)
76 +public class AtomixConsistentTreeMap extends AbstractResource<AtomixConsistentTreeMap>
77 + implements AsyncConsistentTreeMap<String, byte[]> {
78 +
79 + private final Map<MapEventListener<String, byte[]>, Executor>
80 + mapEventListeners = Maps.newConcurrentMap();
81 +
82 + public static final String CHANGE_SUBJECT = "changeEvents";
83 +
84 + public AtomixConsistentTreeMap(CopycatClient client, Properties options) {
85 + super(client, options);
86 + }
87 +
88 + @Override
89 + public String name() {
90 + return null;
91 + }
92 +
93 + @Override
94 + public CompletableFuture<AtomixConsistentTreeMap> open() {
95 + return super.open().thenApply(result -> {
96 + client.onEvent(CHANGE_SUBJECT, this::handleEvent);
97 + return result;
98 + });
99 + }
100 +
101 + private void handleEvent(List<MapEvent<String, byte[]>> events) {
102 + events.forEach(event -> mapEventListeners.
103 + forEach((listener, executor) ->
104 + executor.execute(() ->
105 + listener.event(event))));
106 + }
107 +
108 + @Override
109 + public CompletableFuture<Boolean> isEmpty() {
110 + return client.submit(new IsEmpty());
111 + }
112 +
113 + @Override
114 + public CompletableFuture<Integer> size() {
115 + return client.submit(new Size());
116 + }
117 +
118 + @Override
119 + public CompletableFuture<Boolean> containsKey(String key) {
120 + return client.submit(new ContainsKey(key));
121 + }
122 +
123 + @Override
124 + public CompletableFuture<Boolean> containsValue(byte[] value) {
125 + return client.submit(new ContainsValue(value));
126 + }
127 +
128 + @Override
129 + public CompletableFuture<Versioned<byte[]>> get(String key) {
130 + return client.submit(new Get(key));
131 + }
132 +
133 + @Override
134 + public CompletableFuture<Set<String>> keySet() {
135 + return client.submit(new KeySet());
136 + }
137 +
138 + @Override
139 + public CompletableFuture<Collection<Versioned<byte[]>>> values() {
140 + return client.submit(new Values());
141 + }
142 +
143 + @Override
144 + public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet() {
145 + return client.submit(new EntrySet());
146 + }
147 +
148 + @Override
149 + @SuppressWarnings("unchecked")
150 + public CompletableFuture<Versioned<byte[]>> put(String key, byte[] value) {
151 + return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
152 + .whenComplete((r, e) -> throwIfLocked(r.status()))
153 + .thenApply(v -> v.oldValue());
154 + }
155 +
156 + @Override
157 + @SuppressWarnings("unchecked")
158 + public CompletableFuture<Versioned<byte[]>> putAndGet(String key, byte[] value) {
159 + return client.submit(new UpdateAndGet(key, value, Match.ANY, Match.ANY))
160 + .whenComplete((r, e) -> throwIfLocked(r.status()))
161 + .thenApply(v -> v.newValue());
162 + }
163 +
164 + @Override
165 + @SuppressWarnings("unchecked")
166 + public CompletableFuture<Versioned<byte[]>> putIfAbsent(String key, byte[] value) {
167 + return client.submit(new UpdateAndGet(key, value, Match.NULL, Match.ANY))
168 + .whenComplete((r, e) -> throwIfLocked(r.status()))
169 + .thenApply(v -> v.oldValue());
170 + }
171 +
172 + @Override
173 + @SuppressWarnings("unchecked")
174 + public CompletableFuture<Versioned<byte[]>> remove(String key) {
175 + return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ANY))
176 + .whenComplete((r, e) -> throwIfLocked(r.status()))
177 + .thenApply(v -> v.oldValue());
178 + }
179 +
180 + @Override
181 + @SuppressWarnings("unchecked")
182 + public CompletableFuture<Boolean> remove(String key, byte[] value) {
183 + return client.submit(new UpdateAndGet(key, null, Match.ifValue(value), Match.ANY))
184 + .whenComplete((r, e) -> throwIfLocked(r.status()))
185 + .thenApply(v -> v.updated());
186 + }
187 +
188 + @Override
189 + @SuppressWarnings("unchecked")
190 + public CompletableFuture<Boolean> remove(String key, long version) {
191 + return client.submit(new UpdateAndGet(key, null, Match.ANY, Match.ifValue(version)))
192 + .whenComplete((r, e) -> throwIfLocked(r.status()))
193 + .thenApply(v -> v.updated());
194 + }
195 +
196 + @Override
197 + @SuppressWarnings("unchecked")
198 + public CompletableFuture<Versioned<byte[]>> replace(String key, byte[] value) {
199 + return client.submit(new UpdateAndGet(key, value, Match.NOT_NULL, Match.ANY))
200 + .whenComplete((r, e) -> throwIfLocked(r.status()))
201 + .thenApply(v -> v.oldValue());
202 + }
203 +
204 + @Override
205 + @SuppressWarnings("unchecked")
206 + public CompletableFuture<Boolean> replace(String key, byte[] oldValue, byte[] newValue) {
207 + return client.submit(new UpdateAndGet(key, newValue, Match.ifValue(oldValue), Match.ANY))
208 + .whenComplete((r, e) -> throwIfLocked(r.status()))
209 + .thenApply(v -> v.updated());
210 + }
211 +
212 + @Override
213 + @SuppressWarnings("unchecked")
214 + public CompletableFuture<Boolean> replace(String key, long oldVersion, byte[] newValue) {
215 + return client.submit(new UpdateAndGet(key, newValue, Match.ANY, Match.ifValue(oldVersion)))
216 + .whenComplete((r, e) -> throwIfLocked(r.status()))
217 + .thenApply(v -> v.updated());
218 + }
219 +
220 + @Override
221 + public CompletableFuture<Void> clear() {
222 + return client.submit(new Clear())
223 + .whenComplete((r, e) -> throwIfLocked(r))
224 + .thenApply(v -> null);
225 + }
226 +
227 + @Override
228 + @SuppressWarnings("unchecked")
229 + public CompletableFuture<Versioned<byte[]>> computeIf(String key,
230 + Predicate<? super byte[]> condition,
231 + BiFunction<? super String,
232 + ? super byte[],
233 + ? extends byte[]> remappingFunction) {
234 + return get(key).thenCompose(r1 -> {
235 + byte[] existingValue = r1 == null ? null : r1.value();
236 +
237 + if (!condition.test(existingValue)) {
238 + return CompletableFuture.completedFuture(r1);
239 + }
240 +
241 + AtomicReference<byte[]> computedValue = new AtomicReference<byte[]>();
242 + try {
243 + computedValue.set(remappingFunction.apply(key, existingValue));
244 + } catch (Exception e) {
245 + CompletableFuture<Versioned<byte[]>> future = new CompletableFuture<>();
246 + future.completeExceptionally(e);
247 + return future;
248 + }
249 + if (computedValue.get() == null && r1 == null) {
250 + return CompletableFuture.completedFuture(null);
251 + }
252 + Match<byte[]> valueMatch = r1 == null ? Match.NULL : Match.ANY;
253 + Match<Long> versionMatch = r1 == null ? Match.ANY : Match.ifValue(r1.version());
254 + return client.submit(new UpdateAndGet(key, computedValue.get(),
255 + valueMatch, versionMatch))
256 + .whenComplete((r, e) -> throwIfLocked(r.status()))
257 + .thenApply(v -> v.newValue());
258 + });
259 + }
260 +
261 + @Override
262 + public CompletableFuture<Void> addListener(
263 + MapEventListener<String, byte[]> listener, Executor executor) {
264 + if (mapEventListeners.isEmpty()) {
265 + return client.submit(new Listen()).thenRun(() ->
266 + mapEventListeners.put(listener,
267 + executor));
268 + } else {
269 + mapEventListeners.put(listener, executor);
270 + return CompletableFuture.completedFuture(null);
271 + }
272 + }
273 +
274 + @Override
275 + public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
276 + if (mapEventListeners.remove(listener) != null &&
277 + mapEventListeners.isEmpty()) {
278 + return client.submit(new Unlisten())
279 + .thenApply(v -> null);
280 + }
281 + return CompletableFuture.completedFuture(null);
282 + }
283 +
284 +
285 + private void throwIfLocked(MapEntryUpdateResult.Status status) {
286 + if (status == MapEntryUpdateResult.Status.WRITE_LOCK) {
287 + throw new ConcurrentModificationException("Cannot update TreeMap: another update is in progress.");
288 + }
289 + }
290 +
291 + @Override
292 + public CompletableFuture<String> firstKey() {
293 + return client.submit(new FirstKey<String>());
294 + }
295 +
296 + @Override
297 + public CompletableFuture<String> lastKey() {
298 + return client.submit(new LastKey<String>());
299 + }
300 +
301 + @Override
302 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> ceilingEntry(String key) {
303 + return client.submit(new CeilingEntry(key));
304 + }
305 +
306 + @Override
307 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> floorEntry(String key) {
308 + return client.submit(new FloorEntry<String, Versioned<byte[]>>(key));
309 + }
310 +
311 + @Override
312 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> higherEntry(
313 + String key) {
314 + return client.submit(new HigherEntry<String, Versioned<byte[]>>(key));
315 + }
316 +
317 + @Override
318 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lowerEntry(
319 + String key) {
320 + return client.submit(new LowerEntry<>(key));
321 + }
322 +
323 + @Override
324 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> firstEntry() {
325 + return client.submit(new FirstEntry());
326 + }
327 +
328 + @Override
329 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> lastEntry() {
330 + return client.submit(new LastEntry<String, Versioned<byte[]>>());
331 + }
332 +
333 + @Override
334 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollFirstEntry() {
335 + return client.submit(new PollFirstEntry());
336 + }
337 +
338 + @Override
339 + public CompletableFuture<Map.Entry<String, Versioned<byte[]>>> pollLastEntry() {
340 + return client.submit(new PollLastEntry());
341 + }
342 +
343 + @Override
344 + public CompletableFuture<String> lowerKey(String key) {
345 + return client.submit(new LowerKey(key));
346 + }
347 +
348 + @Override
349 + public CompletableFuture<String> floorKey(String key) {
350 + return client.submit(new FloorKey(key));
351 + }
352 +
353 + @Override
354 + public CompletableFuture<String> ceilingKey(String key) {
355 + return client.submit(new CeilingKey(key));
356 + }
357 +
358 + @Override
359 + public CompletableFuture<String> higherKey(String key) {
360 + return client.submit(new HigherKey(key));
361 + }
362 +
363 + @Override
364 + public CompletableFuture<NavigableSet<String>> navigableKeySet() {
365 + throw new UnsupportedOperationException("This operation is not yet " +
366 + "supported.");
367 + }
368 +
369 + @Override
370 + public CompletableFuture<NavigableMap<String, byte[]>> subMap(
371 + String upperKey, String lowerKey, boolean inclusiveUpper,
372 + boolean inclusiveLower) {
373 + throw new UnsupportedOperationException("This operation is not yet " +
374 + "supported."); }
375 +
376 + @Override
377 + public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String,
378 + byte[]> transaction) {
379 + throw new UnsupportedOperationException("This operation is not yet " +
380 + "supported.");
381 + }
382 +
383 + @Override
384 + public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]>
385 + transaction) {
386 + throw new UnsupportedOperationException("This operation is not yet " +
387 + "supported.");
388 + }
389 +
390 + @Override
391 + public CompletableFuture<Void> commit(TransactionId transactionId) {
392 + throw new UnsupportedOperationException("This operation is not yet " +
393 + "supported.");
394 + }
395 +
396 + @Override
397 + public CompletableFuture<Void> rollback(TransactionId transactionId) {
398 + throw new UnsupportedOperationException("This operation is not yet " +
399 + "supported.");
400 + }
401 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.resources.impl;
18 +
19 +import io.atomix.catalyst.serializer.SerializableTypeResolver;
20 +import io.atomix.copycat.client.CopycatClient;
21 +import io.atomix.resource.ResourceFactory;
22 +import io.atomix.resource.ResourceStateMachine;
23 +
24 +import java.util.Properties;
25 +
26 +/**
27 + * Factory for {@link AtomixConsistentTreeMap}.
28 + */
29 +public class AtomixConsistentTreeMapFactory implements ResourceFactory<AtomixConsistentTreeMap> {
30 + @Override
31 + public SerializableTypeResolver createSerializableTypeResolver() {
32 + return new AtomixConsistentTreeMapCommands.TypeResolver();
33 + }
34 +
35 + @Override
36 + public ResourceStateMachine createStateMachine(Properties config) {
37 + return new AtomixConsistentTreeMapState(config);
38 + }
39 +
40 + @Override
41 + public AtomixConsistentTreeMap createInstance(CopycatClient client, Properties options) {
42 + return new AtomixConsistentTreeMap(client, options);
43 + }
44 +}
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.primitives.resources.impl;
18 +
19 +import com.google.common.collect.Lists;
20 +import com.google.common.collect.Maps;
21 +import com.google.common.collect.Sets;
22 +import io.atomix.copycat.server.Commit;
23 +import io.atomix.copycat.server.Snapshottable;
24 +import io.atomix.copycat.server.StateMachineExecutor;
25 +import io.atomix.copycat.server.session.ServerSession;
26 +import io.atomix.copycat.server.session.SessionListener;
27 +import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
28 +import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
29 +import io.atomix.resource.ResourceStateMachine;
30 +import org.onlab.util.Match;
31 +import org.onosproject.store.service.MapEvent;
32 +import org.onosproject.store.service.Versioned;
33 +
34 +import java.util.AbstractMap.SimpleImmutableEntry;
35 +import java.util.Collection;
36 +import java.util.Iterator;
37 +import java.util.List;
38 +import java.util.Map;
39 +import java.util.NavigableMap;
40 +import java.util.Properties;
41 +import java.util.Set;
42 +import java.util.TreeMap;
43 +import java.util.concurrent.atomic.AtomicLong;
44 +import java.util.function.Function;
45 +import java.util.stream.Collectors;
46 +
47 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingEntry;
48 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.CeilingKey;
49 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Clear;
50 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsKey;
51 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.ContainsValue;
52 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.EntrySet;
53 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstEntry;
54 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
55 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
56 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorKey;
57 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Get;
58 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherEntry;
59 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.HigherKey;
60 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.IsEmpty;
61 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.KeySet;
62 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastEntry;
63 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LastKey;
64 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Listen;
65 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerEntry;
66 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.LowerKey;
67 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollFirstEntry;
68 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.PollLastEntry;
69 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Size;
70 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.SubMap;
71 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Unlisten;
72 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.UpdateAndGet;
73 +import static org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.Values;
74 +import static org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult.*;
75 +
76 +/**
77 + * State machine corresponding to {@link AtomixConsistentTreeMap} backed by a
78 + * {@link TreeMap}.
79 + */
80 +public class AtomixConsistentTreeMapState extends ResourceStateMachine implements SessionListener, Snapshottable {
81 +
82 + private final Map<Long, Commit<? extends Listen>> listeners =
83 + Maps.newHashMap();
84 + private TreeMap<String, TreeMapEntryValue> tree = Maps.newTreeMap();
85 + private final Set<String> preparedKeys = Sets.newHashSet();
86 + private AtomicLong versionCounter = new AtomicLong(0);
87 +
88 + private Function<Commit<SubMap>, NavigableMap<String, TreeMapEntryValue>> subMapFunction = this::subMap;
89 + private Function<Commit<FirstKey>, String> firstKeyFunction = this::firstKey;
90 + private Function<Commit<LastKey>, String> lastKeyFunction = this::lastKey;
91 + private Function<Commit<HigherEntry>, Map.Entry<String, Versioned<byte[]>>> higherEntryFunction =
92 + this::higherEntry;
93 + private Function<Commit<FirstEntry>, Map.Entry<String, Versioned<byte[]>>> firstEntryFunction =
94 + this::firstEntry;
95 + private Function<Commit<LastEntry>, Map.Entry<String, Versioned<byte[]>>> lastEntryFunction =
96 + this::lastEntry;
97 + private Function<Commit<PollFirstEntry>, Map.Entry<String, Versioned<byte[]>>> pollFirstEntryFunction =
98 + this::pollFirstEntry;
99 + private Function<Commit<PollLastEntry>, Map.Entry<String, Versioned<byte[]>>> pollLastEntryFunction =
100 + this::pollLastEntry;
101 + private Function<Commit<LowerEntry>, Map.Entry<String, Versioned<byte[]>>> lowerEntryFunction =
102 + this::lowerEntry;
103 + private Function<Commit<LowerKey>, String> lowerKeyFunction = this::lowerKey;
104 + private Function<Commit<FloorEntry>, Map.Entry<String, Versioned<byte[]>>> floorEntryFunction =
105 + this::floorEntry;
106 + private Function<Commit<CeilingEntry>, Map.Entry<String, Versioned<byte[]>>> ceilingEntryFunction =
107 + this::ceilingEntry;
108 + private Function<Commit<FloorKey>, String> floorKeyFunction = this::floorKey;
109 + private Function<Commit<CeilingKey>, String> ceilingKeyFunction = this::ceilingKey;
110 + private Function<Commit<HigherKey>, String> higherKeyFunction = this::higherKey;
111 +
112 + public AtomixConsistentTreeMapState(Properties properties) {
113 + super(properties);
114 + }
115 +
116 + @Override
117 + public void snapshot(SnapshotWriter writer) {
118 + writer.writeLong(versionCounter.get());
119 + }
120 +
121 + @Override
122 + public void install(SnapshotReader reader) {
123 + versionCounter = new AtomicLong(reader.readLong());
124 + }
125 +
126 + @Override
127 + public void configure(StateMachineExecutor executor) {
128 + // Listeners
129 + executor.register(Listen.class, this::listen);
130 + executor.register(Unlisten.class, this::unlisten);
131 + // Queries
132 + executor.register(ContainsKey.class, this::containsKey);
133 + executor.register(ContainsValue.class, this::containsValue);
134 + executor.register(EntrySet.class, this::entrySet);
135 + executor.register(Get.class, this::get);
136 + executor.register(IsEmpty.class, this::isEmpty);
137 + executor.register(KeySet.class, this::keySet);
138 + executor.register(Size.class, this::size);
139 + executor.register(Values.class, this::values);
140 + executor.register(SubMap.class, subMapFunction);
141 + executor.register(FirstKey.class, firstKeyFunction);
142 + executor.register(LastKey.class, lastKeyFunction);
143 + executor.register(FirstEntry.class, firstEntryFunction);
144 + executor.register(LastEntry.class, lastEntryFunction);
145 + executor.register(PollFirstEntry.class, pollFirstEntryFunction);
146 + executor.register(PollLastEntry.class, pollLastEntryFunction);
147 + executor.register(LowerEntry.class, lowerEntryFunction);
148 + executor.register(LowerKey.class, lowerKeyFunction);
149 + executor.register(FloorEntry.class, floorEntryFunction);
150 + executor.register(FloorKey.class, floorKeyFunction);
151 + executor.register(CeilingEntry.class, ceilingEntryFunction);
152 + executor.register(CeilingKey.class, ceilingKeyFunction);
153 + executor.register(HigherEntry.class, higherEntryFunction);
154 + executor.register(HigherKey.class, higherKeyFunction);
155 +
156 + // Commands
157 + executor.register(UpdateAndGet.class, this::updateAndGet);
158 + executor.register(Clear.class, this::clear);
159 + }
160 +
161 + @Override
162 + public void delete() {
163 + listeners.values().forEach(Commit::close);
164 + listeners.clear();
165 + tree.values().forEach(TreeMapEntryValue::discard);
166 + tree.clear();
167 + }
168 +
169 + protected boolean containsKey(Commit<? extends ContainsKey> commit) {
170 + try {
171 + return toVersioned(tree.get((commit.operation().key()))) != null;
172 + } finally {
173 + commit.close();
174 + }
175 + }
176 +
177 + protected boolean containsValue(Commit<? extends ContainsValue> commit) {
178 + try {
179 + Match<byte[]> valueMatch = Match
180 + .ifValue(commit.operation().value());
181 + return tree.values().stream().anyMatch(
182 + value -> valueMatch.matches(value.value()));
183 + } finally {
184 + commit.close();
185 + }
186 + }
187 +
188 + protected Versioned<byte[]> get(Commit<? extends Get> commit) {
189 + try {
190 + return toVersioned(tree.get(commit.operation().key()));
191 + } finally {
192 + commit.close();
193 + }
194 + }
195 +
196 + protected int size(Commit<? extends Size> commit) {
197 + try {
198 + return tree.size();
199 + } finally {
200 + commit.close();
201 + }
202 + }
203 +
204 + protected boolean isEmpty(Commit<? extends IsEmpty> commit) {
205 + try {
206 + return tree.isEmpty();
207 + } finally {
208 + commit.close();
209 + }
210 + }
211 +
212 + protected Set<String> keySet(Commit<? extends KeySet> commit) {
213 + try {
214 + return tree.keySet().stream().collect(Collectors.toSet());
215 + } finally {
216 + commit.close();
217 + }
218 + }
219 +
220 + protected Collection<Versioned<byte[]>> values(
221 + Commit<? extends Values> commit) {
222 + try {
223 + return tree.values().stream().map(this::toVersioned)
224 + .collect(Collectors.toList());
225 + } finally {
226 + commit.close();
227 + }
228 + }
229 +
230 + protected Set<Map.Entry<String, Versioned<byte[]>>> entrySet(
231 + Commit<? extends EntrySet> commit) {
232 + try {
233 + return tree
234 + .entrySet()
235 + .stream()
236 + .map(e -> Maps.immutableEntry(e.getKey(),
237 + toVersioned(e.getValue())))
238 + .collect(Collectors.toSet());
239 + } finally {
240 + commit.close();
241 + }
242 + }
243 +
244 + protected MapEntryUpdateResult<String, byte[]> updateAndGet(
245 + Commit<? extends UpdateAndGet> commit) {
246 + Status updateStatus = validate(commit.operation());
247 + String key = commit.operation().key();
248 + TreeMapEntryValue oldCommitValue = tree.get(commit.operation().key());
249 + Versioned<byte[]> oldTreeValue = toVersioned(oldCommitValue);
250 +
251 + if (updateStatus != Status.OK) {
252 + commit.close();
253 + return new MapEntryUpdateResult<>(updateStatus, "", key,
254 + oldTreeValue, oldTreeValue);
255 + }
256 +
257 + byte[] newValue = commit.operation().value();
258 + long newVersion = versionCounter.incrementAndGet();
259 + Versioned<byte[]> newTreeValue = newValue == null ? null
260 + : new Versioned<byte[]>(newValue, newVersion);
261 +
262 + MapEvent.Type updateType = newValue == null ? MapEvent.Type.REMOVE
263 + : oldCommitValue == null ? MapEvent.Type.INSERT :
264 + MapEvent.Type.UPDATE;
265 + if (updateType == MapEvent.Type.REMOVE ||
266 + updateType == MapEvent.Type.UPDATE) {
267 + tree.remove(key);
268 + oldCommitValue.discard();
269 + }
270 + if (updateType == MapEvent.Type.INSERT ||
271 + updateType == MapEvent.Type.UPDATE) {
272 + tree.put(key, new NonTransactionalCommit(newVersion, commit));
273 + } else {
274 + commit.close();
275 + }
276 + publish(Lists.newArrayList(new MapEvent<>("", key, newTreeValue,
277 + oldTreeValue)));
278 + return new MapEntryUpdateResult<>(updateStatus, "", key, oldTreeValue,
279 + newTreeValue);
280 + }
281 +
282 + protected Status clear(
283 + Commit<? extends Clear> commit) {
284 + try {
285 + Iterator<Map.Entry<String, TreeMapEntryValue>> iterator = tree
286 + .entrySet()
287 + .iterator();
288 + while (iterator.hasNext()) {
289 + Map.Entry<String, TreeMapEntryValue> entry = iterator.next();
290 + String key = entry.getKey();
291 + TreeMapEntryValue value = entry.getValue();
292 + Versioned<byte[]> removedValue =
293 + new Versioned<byte[]>(value.value(),
294 + value.version());
295 + publish(Lists.newArrayList(new MapEvent<>("", key, null,
296 + removedValue)));
297 + value.discard();
298 + iterator.remove();
299 + }
300 + return Status.OK;
301 + } finally {
302 + commit.close();
303 + }
304 + }
305 +
306 + protected void listen(
307 + Commit<? extends Listen> commit) {
308 + Long sessionId = commit.session().id();
309 + listeners.put(sessionId, commit);
310 + commit.session()
311 + .onStateChange(
312 + state -> {
313 + if (state == ServerSession.State.CLOSED
314 + || state == ServerSession.State.EXPIRED) {
315 + Commit<? extends Listen> listener =
316 + listeners.remove(sessionId);
317 + if (listener != null) {
318 + listener.close();
319 + }
320 + }
321 + });
322 + }
323 +
324 + protected void unlisten(
325 + Commit<? extends Unlisten> commit) {
326 + try {
327 + Commit<? extends AtomixConsistentTreeMapCommands.Listen> listener =
328 + listeners.remove(commit.session());
329 + if (listener != null) {
330 + listener.close();
331 + }
332 + } finally {
333 + commit.close();
334 + }
335 + }
336 +
337 + private Status validate(UpdateAndGet update) {
338 + TreeMapEntryValue existingValue = tree.get(update.key());
339 + if (existingValue == null && update.value() == null) {
340 + return Status.NOOP;
341 + }
342 + if (preparedKeys.contains(update.key())) {
343 + return Status.WRITE_LOCK;
344 + }
345 + byte[] existingRawValue = existingValue == null ? null :
346 + existingValue.value();
347 + Long existingVersion = existingValue == null ? null :
348 + existingValue.version();
349 + return update.valueMatch().matches(existingRawValue)
350 + && update.versionMatch().matches(existingVersion) ?
351 + Status.OK
352 + : Status.PRECONDITION_FAILED;
353 + }
354 +
355 + protected NavigableMap<String, TreeMapEntryValue> subMap(
356 + Commit<? extends SubMap> commit) {
357 + //Do not support this until lazy communication is possible. At present
358 + // it transmits up to the entire map.
359 + try {
360 + SubMap<String, TreeMapEntryValue> subMap = commit.operation();
361 + return tree.subMap(subMap.fromKey(), subMap.isInclusiveFrom(),
362 + subMap.toKey(), subMap.isInclusiveTo());
363 + } finally {
364 + commit.close();
365 + }
366 + }
367 +
368 + protected String firstKey(Commit<? extends FirstKey> commit) {
369 + try {
370 + if (tree.isEmpty()) {
371 + return null;
372 + }
373 + return tree.firstKey();
374 + } finally {
375 + commit.close();
376 + }
377 + }
378 +
379 + protected String lastKey(Commit<? extends LastKey> commit) {
380 + try {
381 + return tree.isEmpty() ? null : tree.lastKey();
382 + } finally {
383 + commit.close();
384 + }
385 + }
386 +
387 + protected Map.Entry<String, Versioned<byte[]>> higherEntry(
388 + Commit<? extends HigherEntry> commit) {
389 + try {
390 + if (tree.isEmpty()) {
391 + return null;
392 + }
393 + return toVersionedEntry(
394 + tree.higherEntry(commit.operation().key()));
395 + } finally {
396 + commit.close();
397 + }
398 + }
399 +
400 + protected Map.Entry<String, Versioned<byte[]>> firstEntry(
401 + Commit<? extends FirstEntry> commit) {
402 + try {
403 + if (tree.isEmpty()) {
404 + return null;
405 + }
406 + return toVersionedEntry(tree.firstEntry());
407 + } finally {
408 + commit.close();
409 + }
410 + }
411 +
412 + protected Map.Entry<String, Versioned<byte[]>> lastEntry(
413 + Commit<? extends LastEntry> commit) {
414 + try {
415 + if (tree.isEmpty()) {
416 + return null;
417 + }
418 + return toVersionedEntry(tree.lastEntry());
419 + } finally {
420 + commit.close();
421 + }
422 + }
423 +
424 + protected Map.Entry<String, Versioned<byte[]>> pollFirstEntry(
425 + Commit<? extends PollFirstEntry> commit) {
426 + try {
427 + return toVersionedEntry(tree.pollFirstEntry());
428 + } finally {
429 + commit.close();
430 + }
431 + }
432 +
433 + protected Map.Entry<String, Versioned<byte[]>> pollLastEntry(
434 + Commit<? extends PollLastEntry> commit) {
435 + try {
436 + return toVersionedEntry(tree.pollLastEntry());
437 + } finally {
438 + commit.close();
439 + }
440 + }
441 +
442 + protected Map.Entry<String, Versioned<byte[]>> lowerEntry(
443 + Commit<? extends LowerEntry> commit) {
444 + try {
445 + return toVersionedEntry(tree.lowerEntry(commit.operation().key()));
446 + } finally {
447 + commit.close();
448 + }
449 + }
450 +
451 + protected String lowerKey(Commit<? extends LowerKey> commit) {
452 + try {
453 + return tree.lowerKey(commit.operation().key());
454 + } finally {
455 + commit.close();
456 + }
457 + }
458 +
459 + protected Map.Entry<String, Versioned<byte[]>> floorEntry(
460 + Commit<? extends FloorEntry> commit) {
461 + try {
462 + return toVersionedEntry(tree.floorEntry(commit.operation().key()));
463 + } finally {
464 + commit.close();
465 + }
466 + }
467 +
468 + protected String floorKey(Commit<? extends FloorKey> commit) {
469 + try {
470 + return tree.floorKey(commit.operation().key());
471 + } finally {
472 + commit.close();
473 + }
474 + }
475 +
476 + protected Map.Entry<String, Versioned<byte[]>> ceilingEntry(
477 + Commit<CeilingEntry> commit) {
478 + try {
479 + return toVersionedEntry(
480 + tree.ceilingEntry(commit.operation().key()));
481 + } finally {
482 + commit.close();
483 + }
484 + }
485 +
486 + protected String ceilingKey(Commit<CeilingKey> commit) {
487 + try {
488 + return tree.ceilingKey(commit.operation().key());
489 + } finally {
490 + commit.close();
491 + }
492 + }
493 +
494 + protected String higherKey(Commit<HigherKey> commit) {
495 + try {
496 + return tree.higherKey(commit.operation().key());
497 + } finally {
498 + commit.close();
499 + }
500 + }
501 +
502 + private Versioned<byte[]> toVersioned(TreeMapEntryValue value) {
503 + return value == null ? null :
504 + new Versioned<byte[]>(value.value(), value.version());
505 + }
506 +
507 + private Map.Entry<String, Versioned<byte[]>> toVersionedEntry(
508 + Map.Entry<String, TreeMapEntryValue> entry) {
509 + //FIXME is this the best type of entry to return?
510 + return entry == null ? null : new SimpleImmutableEntry<>(
511 + entry.getKey(), toVersioned(entry.getValue()));
512 + }
513 +
514 + private void publish(List<MapEvent<String, byte[]>> events) {
515 + listeners.values().forEach(commit -> commit.session()
516 + .publish(AtomixConsistentTreeMap.CHANGE_SUBJECT, events));
517 + }
518 +
519 + @Override
520 + public void register(ServerSession session) {
521 + }
522 +
523 + @Override
524 + public void unregister(ServerSession session) {
525 + closeListener(session.id());
526 + }
527 +
528 + @Override
529 + public void expire(ServerSession session) {
530 + closeListener(session.id());
531 + }
532 +
533 + @Override
534 + public void close(ServerSession session) {
535 + closeListener(session.id());
536 + }
537 +
538 + private void closeListener(Long sessionId) {
539 + Commit<? extends Listen> commit = listeners.remove(sessionId);
540 + if (commit != null) {
541 + commit.close();
542 + }
543 + }
544 +
545 + private interface TreeMapEntryValue {
546 +
547 + byte[] value();
548 +
549 + long version();
550 +
551 + void discard();
552 + }
553 +
554 + private class NonTransactionalCommit implements TreeMapEntryValue {
555 + private final long version;
556 + private final Commit<? extends UpdateAndGet> commit;
557 +
558 + public NonTransactionalCommit(long version,
559 + Commit<? extends UpdateAndGet> commit) {
560 + this.version = version;
561 + this.commit = commit;
562 + }
563 +
564 + @Override
565 + public byte[] value() {
566 + return commit.operation().value();
567 + }
568 +
569 + @Override
570 + public long version() {
571 + return version;
572 + }
573 +
574 + @Override
575 + public void discard() {
576 + commit.close();
577 + }
578 + }
579 +}
...@@ -404,7 +404,7 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase { ...@@ -404,7 +404,7 @@ public class AtomixConsistentSetMultimapTest extends AtomixTestBase {
404 private AtomixConsistentSetMultimap createResource(String mapName) { 404 private AtomixConsistentSetMultimap createResource(String mapName) {
405 try { 405 try {
406 AtomixConsistentSetMultimap map = createAtomixClient(). 406 AtomixConsistentSetMultimap map = createAtomixClient().
407 - getResource("mapName", AtomixConsistentSetMultimap.class) 407 + getResource(mapName, AtomixConsistentSetMultimap.class)
408 .join(); 408 .join();
409 return map; 409 return map;
410 } catch (Throwable e) { 410 } catch (Throwable e) {
......
1 +/*
2 + * Copyright 2016 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.resources.impl;
17 +
18 +import com.google.common.base.Throwables;
19 +import com.google.common.collect.Lists;
20 +import io.atomix.resource.ResourceType;
21 +import org.junit.AfterClass;
22 +import org.junit.BeforeClass;
23 +import org.junit.Test;
24 +import org.onlab.util.Tools;
25 +import org.onosproject.store.service.MapEvent;
26 +import org.onosproject.store.service.MapEventListener;
27 +
28 +import java.util.Arrays;
29 +import java.util.Collection;
30 +import java.util.List;
31 +import java.util.Map;
32 +import java.util.concurrent.ArrayBlockingQueue;
33 +import java.util.concurrent.BlockingQueue;
34 +import java.util.stream.Collectors;
35 +
36 +import static org.junit.Assert.assertArrayEquals;
37 +import static org.junit.Assert.assertEquals;
38 +import static org.junit.Assert.assertFalse;
39 +import static org.junit.Assert.assertNotNull;
40 +import static org.junit.Assert.assertNull;
41 +import static org.junit.Assert.assertTrue;
42 +
43 +/**
44 + * Unit tests for {@link AtomixConsistentTreeMap}.
45 + */
46 +public class AtomixConsistentTreeMapTest extends AtomixTestBase {
47 + private final String keyFour = "hello";
48 + private final String keyThree = "goodbye";
49 + private final String keyTwo = "foo";
50 + private final String keyOne = "bar";
51 + private final byte[] valueOne = Tools.getBytesUtf8(keyOne);
52 + private final byte[] valueTwo = Tools.getBytesUtf8(keyTwo);
53 + private final byte[] valueThree = Tools.getBytesUtf8(keyThree);
54 + private final byte[] valueFour = Tools.getBytesUtf8(keyFour);
55 + private final byte[] spareValue = Tools.getBytesUtf8("spareValue");
56 + private final List<String> allKeys = Lists.newArrayList(keyOne, keyTwo,
57 + keyThree, keyFour);
58 + private final List<byte[]> allValues = Lists.newArrayList(valueOne,
59 + valueTwo,
60 + valueThree,
61 + valueFour);
62 + @BeforeClass
63 + public static void preTestSetup() throws Throwable {
64 + createCopycatServers(3);
65 + }
66 +
67 + @AfterClass
68 + public static void postTestCleanup() throws Throwable {
69 + clearTests();
70 + }
71 +
72 + @Override
73 + protected ResourceType resourceType() {
74 + return new ResourceType(AtomixConsistentTreeMap.class);
75 + }
76 +
77 + /**
78 + * Tests of the functionality associated with the
79 + * {@link org.onosproject.store.service.AsyncConsistentMap} interface
80 + * except transactions and listeners.
81 + */
82 + @Test
83 + public void testBasicMapOperations() throws Throwable {
84 + //Throughout the test there are isEmpty queries, these are intended to
85 + //make sure that the previous section has been cleaned up, they serve
86 + //the secondary purpose of testing isEmpty but that is not their
87 + //primary purpose.
88 + AtomixConsistentTreeMap map = createResource("basicTestMap");
89 + //test size
90 + map.size().thenAccept(result -> assertEquals(0, (int) result)).join();
91 + map.isEmpty().thenAccept(result -> assertTrue(result)).join();
92 + //test contains key
93 + allKeys.forEach(key -> map.containsKey(key).
94 + thenAccept(result -> assertFalse(result)).join());
95 + //test contains value
96 + allValues.forEach(value -> map.containsValue(value)
97 + .thenAccept(result -> assertFalse(result)).join());
98 + //test get
99 + allKeys.forEach(key -> map.get(key).
100 + thenAccept(result -> assertNull(result)).join());
101 +
102 + //populate and redo prior three tests
103 + allKeys.forEach(key -> map.put(key, allValues
104 + .get(allKeys.indexOf(key))).thenAccept(
105 + result -> assertNull(result)).join());
106 + //test contains key
107 + allKeys.forEach(key -> map.containsKey(key).
108 + thenAccept(result -> assertTrue(result)).join());
109 + //test contains value
110 + allValues.forEach(value -> map.containsValue(value)
111 + .thenAccept(result -> assertTrue(result)).join());
112 + //test get
113 + allKeys.forEach(key -> map.get(key).
114 + thenAccept(
115 + result -> assertArrayEquals(
116 + allValues.get(allKeys.indexOf(key)),
117 + result.value())).join());
118 + //test all compute methods in this section
119 + allKeys.forEach(key -> map.computeIfAbsent(
120 + key, v ->allValues.get(allKeys.indexOf(key)
121 + )).thenAccept(result ->
122 + assertArrayEquals(
123 + allValues.get(allKeys.indexOf(key)),
124 + result.value())).join());
125 + map.size().thenAccept(result -> assertEquals(4, (int) result)).join();
126 + map.isEmpty().thenAccept(result -> assertFalse(result)).join();
127 + allKeys.forEach(key -> map.computeIfPresent(key, (k, v) -> null).
128 + thenAccept(result -> assertNull(result)).join());
129 + map.isEmpty().thenAccept(result -> assertTrue(result)).join();
130 + allKeys.forEach(key -> map.compute(key, (k, v) ->
131 + allValues.get(allKeys.indexOf(key))).
132 + thenAccept(result -> assertArrayEquals(
133 + allValues.get(allKeys.indexOf(key)),
134 + result.value())).join());
135 + map.size().thenAccept(result -> assertEquals(4, (int) result)).join();
136 + map.isEmpty().thenAccept(result -> assertFalse(result)).join();
137 + allKeys.forEach(key -> map.computeIf(key,
138 + (k) -> allKeys.indexOf(key) < 2,
139 + (k, v) -> null).thenAccept(result -> {
140 + if (allKeys.indexOf(key) < 2) {
141 + assertNull(result);
142 + } else {
143 + assertArrayEquals(allValues.get(allKeys.indexOf(key)),
144 + result.value());
145 + }
146 + }).join());
147 + map.size().thenAccept(result -> assertEquals(2, (int) result)).join();
148 + map.isEmpty().thenAccept(result -> assertFalse(result)).join();
149 + //test simple put
150 + allKeys.forEach(
151 + key -> map.put(key, allValues.get(allKeys.indexOf(key)))
152 + .thenAccept(result -> {
153 + if (allKeys.indexOf(key) < 2) {
154 + assertNull(result);
155 + } else {
156 + assertArrayEquals(
157 + allValues.get(allKeys.indexOf(key)),
158 + result.value());
159 + }
160 + }).join());
161 + map.size().thenAccept(result -> assertEquals(4, (int) result)).join();
162 + map.isEmpty().thenAccept(result -> assertFalse(result)).join();
163 + //test put and get for version retrieval
164 + allKeys.forEach(
165 + key -> map.putAndGet(key, allValues.get(allKeys.indexOf(key))).
166 + thenAccept(firstResult -> {
167 + map.putAndGet(key, allValues.get(allKeys.indexOf(key))).
168 + thenAccept(secondResult -> {
169 + assertArrayEquals(allValues.get(allKeys.indexOf(key)),
170 + firstResult.value());
171 + assertArrayEquals(allValues.get(allKeys.indexOf(key)),
172 + secondResult.value());
173 + assertTrue((firstResult.version() + 1) ==
174 + secondResult.version());
175 + });
176 + }).join());
177 + //test removal
178 + allKeys.forEach(key -> map.remove(key).thenAccept(
179 + result -> assertArrayEquals(
180 + allValues.get(allKeys.indexOf(key)), result.value()))
181 + .join());
182 + map.isEmpty().thenAccept(result -> assertTrue(result));
183 + //repopulating, this is not mainly for testing
184 + allKeys.forEach(key -> map.put(
185 + key, allValues.get(allKeys.indexOf(key)))
186 + .thenAccept(result -> {
187 + assertNull(result);
188 + }).join());
189 +
190 + //Test various collections of keys, values and entries
191 + map.keySet().thenAccept(
192 + keys -> assertTrue(
193 + stringArrayCollectionIsEqual(keys, allKeys)))
194 + .join();
195 + map.values().thenAccept(
196 + values -> assertTrue(
197 + byteArrayCollectionIsEqual(values.stream().map(
198 + v -> v.value()).collect(
199 + Collectors.toSet()), allValues)))
200 + .join();
201 + map.entrySet().thenAccept(entrySet -> {
202 + entrySet.forEach(entry -> {
203 + assertTrue(allKeys.contains(entry.getKey()));
204 + assertTrue(Arrays.equals(entry.getValue().value(),
205 + allValues.get(allKeys.indexOf(entry.getKey()))));
206 + });
207 + }).join();
208 + map.clear().join();
209 + map.isEmpty().thenAccept(result -> assertTrue(result)).join();
210 +
211 + //test conditional put
212 + allKeys.forEach(
213 + key -> map.putIfAbsent(
214 + key, allValues.get(allKeys.indexOf(key))).
215 + thenAccept(result -> assertNull(result)).join());
216 + allKeys.forEach(
217 + key -> map.putIfAbsent(
218 + key, null).
219 + thenAccept(result ->
220 + assertArrayEquals(result.value(),
221 + allValues.get(allKeys.indexOf(key))))
222 + .join());
223 + // test alternate removes that specify value or version
224 + allKeys.forEach(
225 + key -> map.remove(key, spareValue).thenAccept(
226 + result -> assertFalse(result)).join());
227 + allKeys.forEach(
228 + key -> map.remove(key, allValues.get(allKeys.indexOf(key)))
229 + .thenAccept(result -> assertTrue(result)).join());
230 + map.isEmpty().thenAccept(result -> assertTrue(result)).join();
231 + List<Long> versions = Lists.newArrayList();
232 +
233 + //repopulating set for version based removal
234 + allKeys.forEach(
235 + key -> map.putAndGet(key, allValues.get(allKeys.indexOf(key)))
236 + .thenAccept(result -> versions.add(result.version())).join());
237 + allKeys.forEach(
238 + key -> map.remove(key, versions.get(0)).thenAccept(
239 + result -> {
240 + assertTrue(result);
241 + versions.remove(0);
242 + }).join());
243 + map.isEmpty().thenAccept(result -> assertTrue(result)).join();
244 + //Testing all replace both simple (k, v), and complex that consider
245 + // previous mapping or version.
246 + allKeys.forEach(
247 + key -> map.put(key, allValues.get(allKeys.indexOf(key)))
248 + .thenAccept(result -> assertNull(result)).join());
249 + allKeys.forEach(key -> map.replace(
250 + key, allValues.get(3 - allKeys.indexOf(key)))
251 + .thenAccept(result -> assertArrayEquals(
252 + allValues.get(allKeys.indexOf(key)), result.value()))
253 + .join());
254 + allKeys.forEach(key -> map.replace(key,
255 + spareValue,
256 + allValues.get(allKeys.indexOf(key)))
257 + .thenAccept(result -> assertFalse(result))
258 + .join());
259 + allKeys.forEach(key -> map.replace(
260 + key, allValues.get(3 - allKeys.indexOf(key)),
261 + allValues.get(allKeys.indexOf(key)))
262 + .thenAccept(result -> assertTrue(result)).join());
263 + map.clear().join();
264 + map.isEmpty().thenAccept(result -> assertTrue(result)).join();
265 + versions.clear();
266 + //populate for version based replacement
267 + allKeys.forEach(
268 + key -> map.putAndGet(
269 + key, allValues.get(3 - allKeys.indexOf(key)))
270 + .thenAccept(result ->
271 + versions.add(result.version())).join());
272 + allKeys.forEach(key -> map.replace(
273 + key, 0, allValues.get(allKeys.indexOf(key)))
274 + .thenAccept(result -> assertFalse(result))
275 + .join());
276 + allKeys.forEach(key -> map.replace(
277 + key, versions.get(0), allValues.get(allKeys.indexOf(key)))
278 + .thenAccept(result -> {
279 + assertTrue(result);
280 + versions.remove(0);
281 + }).join());
282 + }
283 +
284 + @Test
285 + public void mapListenerTests() throws Throwable {
286 + final byte[] value1 = Tools.getBytesUtf8("value1");
287 + final byte[] value2 = Tools.getBytesUtf8("value2");
288 + final byte[] value3 = Tools.getBytesUtf8("value3");
289 +
290 + AtomixConsistentTreeMap map = createResource("treeMapListenerTestMap");
291 + TestMapEventListener listener = new TestMapEventListener();
292 +
293 + // add listener; insert new value into map and verify an INSERT event
294 + // is received.
295 + map.addListener(listener).thenCompose(v -> map.put("foo", value1))
296 + .join();
297 + MapEvent<String, byte[]> event = listener.event();
298 + assertNotNull(event);
299 + assertEquals(MapEvent.Type.INSERT, event.type());
300 + assertTrue(Arrays.equals(value1, event.newValue().value()));
301 +
302 + // remove listener and verify listener is not notified.
303 + map.removeListener(listener).thenCompose(v -> map.put("foo", value2))
304 + .join();
305 + assertFalse(listener.eventReceived());
306 +
307 + // add the listener back and verify UPDATE events are received
308 + // correctly
309 + map.addListener(listener).thenCompose(v -> map.put("foo", value3))
310 + .join();
311 + event = listener.event();
312 + assertNotNull(event);
313 + assertEquals(MapEvent.Type.UPDATE, event.type());
314 + assertTrue(Arrays.equals(value3, event.newValue().value()));
315 +
316 + // perform a non-state changing operation and verify no events are
317 + // received.
318 + map.putIfAbsent("foo", value1).join();
319 + assertFalse(listener.eventReceived());
320 +
321 + // verify REMOVE events are received correctly.
322 + map.remove("foo").join();
323 + event = listener.event();
324 + assertNotNull(event);
325 + assertEquals(MapEvent.Type.REMOVE, event.type());
326 + assertTrue(Arrays.equals(value3, event.oldValue().value()));
327 +
328 + // verify compute methods also generate events.
329 + map.computeIf("foo", v -> v == null, (k, v) -> value1).join();
330 + event = listener.event();
331 + assertNotNull(event);
332 + assertEquals(MapEvent.Type.INSERT, event.type());
333 + assertTrue(Arrays.equals(value1, event.newValue().value()));
334 +
335 + map.compute("foo", (k, v) -> value2).join();
336 + event = listener.event();
337 + assertNotNull(event);
338 + assertEquals(MapEvent.Type.UPDATE, event.type());
339 + assertTrue(Arrays.equals(value2, event.newValue().value()));
340 +
341 + map.computeIf(
342 + "foo", v -> Arrays.equals(v, value2), (k, v) -> null).join();
343 + event = listener.event();
344 + assertNotNull(event);
345 + assertEquals(MapEvent.Type.REMOVE, event.type());
346 + assertTrue(Arrays.equals(value2, event.oldValue().value()));
347 +
348 + map.removeListener(listener).join();
349 + }
350 +
351 + /**
352 + * Tests functionality specified in the {@link AtomixConsistentTreeMap}
353 + * interface, beyond the functionality provided in
354 + * {@link org.onosproject.store.service.AsyncConsistentMap}.
355 + */
356 + @Test
357 + public void treeMapFunctionsTest() {
358 + AtomixConsistentTreeMap map = createResource("treeMapFunctionTestMap");
359 + //Tests on empty map
360 + map.firstKey().thenAccept(result -> assertNull(result)).join();
361 + map.lastKey().thenAccept(result -> assertNull(result)).join();
362 + map.ceilingEntry(keyOne).thenAccept(result -> assertNull(result))
363 + .join();
364 + map.floorEntry(keyOne).thenAccept(result -> assertNull(result)).join();
365 + map.higherEntry(keyOne).thenAccept(result -> assertNull(result))
366 + .join();
367 + map.lowerEntry(keyOne).thenAccept(result -> assertNull(result)).join();
368 + map.firstEntry().thenAccept(result -> assertNull(result)).join();
369 + map.lastEntry().thenAccept(result -> assertNull(result)).join();
370 + map.pollFirstEntry().thenAccept(result -> assertNull(result)).join();
371 + map.pollLastEntry().thenAccept(result -> assertNull(result)).join();
372 + map.lowerKey(keyOne).thenAccept(result -> assertNull(result)).join();
373 + map.floorKey(keyOne).thenAccept(result -> assertNull(result)).join();
374 + map.ceilingKey(keyOne).thenAccept(result -> assertNull(result))
375 + .join();
376 + map.higherKey(keyOne).thenAccept(result -> assertNull(result)).join();
377 + map.delete().join();
378 +
379 + allKeys.forEach(key -> map.put(
380 + key, allValues.get(allKeys.indexOf(key)))
381 + .thenAccept(result -> assertNull(result)).join());
382 + //Note ordering keys are in their proper ordering in ascending order
383 + //both in naming and in the allKeys list.
384 +
385 + map.firstKey().thenAccept(result -> assertEquals(keyOne, result))
386 + .join();
387 + map.lastKey().thenAccept(result -> assertEquals(keyFour, result))
388 + .join();
389 + map.ceilingEntry(keyOne)
390 + .thenAccept(result -> {
391 + assertEquals(keyOne, result.getKey());
392 + assertArrayEquals(valueOne, result.getValue().value());
393 + })
394 + .join();
395 + //adding an additional letter to make keyOne an unacceptable response
396 + map.ceilingEntry(keyOne + "a")
397 + .thenAccept(result -> {
398 + assertEquals(keyTwo, result.getKey());
399 + assertArrayEquals(valueTwo, result.getValue().value());
400 + })
401 + .join();
402 + map.ceilingEntry(keyFour + "a")
403 + .thenAccept(result -> {
404 + assertNull(result);
405 + })
406 + .join();
407 + map.floorEntry(keyTwo).thenAccept(result -> {
408 + assertEquals(keyTwo, result.getKey());
409 + assertArrayEquals(valueTwo, result.getValue().value());
410 + })
411 + .join();
412 + //shorten the key so it itself is not an acceptable reply
413 + map.floorEntry(keyTwo.substring(0, 2)).thenAccept(result -> {
414 + assertEquals(keyOne, result.getKey());
415 + assertArrayEquals(valueOne, result.getValue().value());
416 + })
417 + .join();
418 + // shorten least key so no acceptable response exists
419 + map.floorEntry(keyOne.substring(0, 1)).thenAccept(
420 + result -> assertNull(result))
421 + .join();
422 +
423 + map.higherEntry(keyTwo).thenAccept(result -> {
424 + assertEquals(keyThree, result.getKey());
425 + assertArrayEquals(valueThree, result.getValue().value());
426 + })
427 + .join();
428 + map.higherEntry(keyFour).thenAccept(result -> assertNull(result))
429 + .join();
430 +
431 + map.lowerEntry(keyFour).thenAccept(result -> {
432 + assertEquals(keyThree, result.getKey());
433 + assertArrayEquals(valueThree, result.getValue().value());
434 + })
435 + .join();
436 + map.lowerEntry(keyOne).thenAccept(result -> assertNull(result))
437 + .join();
438 + map.firstEntry().thenAccept(result -> {
439 + assertEquals(keyOne, result.getKey());
440 + assertArrayEquals(valueOne, result.getValue().value());
441 + })
442 + .join();
443 + map.lastEntry().thenAccept(result -> {
444 + assertEquals(keyFour, result.getKey());
445 + assertArrayEquals(valueFour, result.getValue().value());
446 + })
447 + .join();
448 + map.pollFirstEntry().thenAccept(result -> {
449 + assertEquals(keyOne, result.getKey());
450 + assertArrayEquals(valueOne, result.getValue().value());
451 + });
452 + map.containsKey(keyOne).thenAccept(result -> assertFalse(result))
453 + .join();
454 + map.size().thenAccept(result -> assertEquals(3, (int) result)).join();
455 + map.pollLastEntry().thenAccept(result -> {
456 + assertEquals(keyFour, result.getKey());
457 + assertArrayEquals(valueFour, result.getValue().value());
458 + });
459 + map.containsKey(keyFour).thenAccept(result -> assertFalse(result))
460 + .join();
461 + map.size().thenAccept(result -> assertEquals(2, (int) result)).join();
462 +
463 + //repopulate the missing entries
464 + allKeys.forEach(key -> map.put(
465 + key, allValues.get(allKeys.indexOf(key)))
466 + .thenAccept(result -> {
467 + if (key.equals(keyOne) || key.equals(keyFour)) {
468 + assertNull(result);
469 + } else {
470 + assertArrayEquals(allValues.get(allKeys.indexOf(key)),
471 + result.value());
472 + }
473 + })
474 + .join());
475 + map.lowerKey(keyOne).thenAccept(result -> assertNull(result)).join();
476 + map.lowerKey(keyThree).thenAccept(
477 + result -> assertEquals(keyTwo, result))
478 + .join();
479 + map.floorKey(keyThree).thenAccept(
480 + result -> assertEquals(keyThree, result))
481 + .join();
482 + //shortening the key so there is no acceptable response
483 + map.floorKey(keyOne.substring(0, 1)).thenAccept(
484 + result -> assertNull(result))
485 + .join();
486 + map.ceilingKey(keyTwo).thenAccept(
487 + result -> assertEquals(keyTwo, result))
488 + .join();
489 + //adding to highest key so there is no acceptable response
490 + map.ceilingKey(keyFour + "a")
491 + .thenAccept(reslt -> assertNull(reslt))
492 + .join();
493 + map.higherKey(keyThree).thenAccept(
494 + result -> assertEquals(keyFour, result))
495 + .join();
496 + map.higherKey(keyFour).thenAccept(
497 + result -> assertNull(result))
498 + .join();
499 + map.delete().join();
500 +
501 + }
502 +
503 + private AtomixConsistentTreeMap createResource(String mapName) {
504 + try {
505 + AtomixConsistentTreeMap map = createAtomixClient().
506 + getResource(mapName, AtomixConsistentTreeMap.class)
507 + .join();
508 + return map;
509 + } catch (Throwable e) {
510 + throw new RuntimeException(e.toString());
511 + }
512 + }
513 + private static class TestMapEventListener
514 + implements MapEventListener<String, byte[]> {
515 +
516 + private final BlockingQueue<MapEvent<String, byte[]>> queue =
517 + new ArrayBlockingQueue<>(1);
518 +
519 + @Override
520 + public void event(MapEvent<String, byte[]> event) {
521 + try {
522 + queue.put(event);
523 + } catch (InterruptedException e) {
524 + Throwables.propagate(e);
525 + }
526 + }
527 +
528 + public boolean eventReceived() {
529 + return !queue.isEmpty();
530 + }
531 +
532 + public MapEvent<String, byte[]> event() throws InterruptedException {
533 + return queue.take();
534 + }
535 + }
536 +
537 + /**
538 + * Returns two arrays contain the same set of elements,
539 + * regardless of order.
540 + * @param o1 first collection
541 + * @param o2 second collection
542 + * @return true if they contain the same elements
543 + */
544 + private boolean byteArrayCollectionIsEqual(
545 + Collection<? extends byte[]> o1, Collection<? extends byte[]> o2) {
546 + if (o1 == null || o2 == null || o1.size() != o2.size()) {
547 + return false;
548 + }
549 + for (byte[] array1 : o1) {
550 + boolean matched = false;
551 + for (byte[] array2 : o2) {
552 + if (Arrays.equals(array1, array2)) {
553 + matched = true;
554 + break;
555 + }
556 + }
557 + if (!matched) {
558 + return false;
559 + }
560 + }
561 + return true;
562 + }
563 +
564 + /**
565 + * Compares two collections of strings returns true if they contain the
566 + * same strings, false otherwise.
567 + * @param s1 string collection one
568 + * @param s2 string collection two
569 + * @return true if the two sets contain the same strings
570 + */
571 + private boolean stringArrayCollectionIsEqual(
572 + Collection<? extends String> s1, Collection<? extends String> s2) {
573 + if (s1 == null || s2 == null || s1.size() != s2.size()) {
574 + return false;
575 + }
576 + for (String string1 : s1) {
577 + boolean matched = false;
578 + for (String string2 : s2) {
579 + if (string1.equals(string2)) {
580 + matched = true;
581 + break;
582 + }
583 + }
584 + if (!matched) {
585 + return false;
586 + }
587 + }
588 + return true;
589 + }
590 +
591 + /**
592 + * Inner entry type for testing.
593 + * @param <K>
594 + * @param <V>
595 + */
596 + private class InnerEntry<K, V> implements Map.Entry<K, V> {
597 + private K key;
598 + private V value;
599 + public InnerEntry(K key, V value) {
600 + this.key = key;
601 + this.value = value;
602 + }
603 +
604 + @Override
605 + public K getKey() {
606 + return key;
607 + }
608 +
609 + @Override
610 + public V getValue() {
611 + return value;
612 + }
613 +
614 + @Override
615 + public V setValue(V value) {
616 + V temp = this.value;
617 + this.value = value;
618 + return temp;
619 + }
620 +
621 + @Override
622 + public boolean equals(Object o) {
623 + if (!(o instanceof InnerEntry)) {
624 + return false;
625 + }
626 + InnerEntry other = (InnerEntry) o;
627 + boolean keysEqual = false;
628 + boolean valuesEqual = false;
629 + if (this.key instanceof byte[]) {
630 + if (other.getKey() instanceof byte[]) {
631 + keysEqual = Arrays.equals((byte[]) this.key,
632 + (byte[]) other.getKey());
633 + } else {
634 + return false;
635 + }
636 + } else {
637 + keysEqual = this.getKey().equals(other.getKey());
638 + }
639 +
640 + if (keysEqual) {
641 + if (this.value instanceof byte[]) {
642 + if (other.getValue() instanceof byte[]) {
643 + return Arrays.equals((byte[]) this.value,
644 + (byte[]) other.getValue());
645 + } else {
646 + return false;
647 + }
648 + } else {
649 + return this.key.equals(other.getKey());
650 + }
651 + }
652 + return false;
653 + }
654 +
655 + @Override
656 + public int hashCode() {
657 + return 0;
658 + }
659 + }
660 +}
...\ No newline at end of file ...\ No newline at end of file