Madan Jampani
Committed by Gerrit Code Review

Add new methods to ConsistentMap API to improve usability.

Change-Id: I1e82f0ab191edc6b0f52c7d7b0307aa3d2ef9d1f

Change-Id: I4c5982fe6596f716729b7885eb584a60735cd41b
...@@ -18,8 +18,12 @@ package org.onosproject.store.service; ...@@ -18,8 +18,12 @@ package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 +import java.util.Optional;
21 import java.util.Set; 22 import java.util.Set;
22 import java.util.concurrent.CompletableFuture; 23 import java.util.concurrent.CompletableFuture;
24 +import java.util.function.BiFunction;
25 +import java.util.function.Function;
26 +import java.util.function.Predicate;
23 27
24 /** 28 /**
25 * A distributed, strongly consistent map whose methods are all executed asynchronously. 29 * A distributed, strongly consistent map whose methods are all executed asynchronously.
...@@ -84,6 +88,61 @@ public interface AsyncConsistentMap<K, V> { ...@@ -84,6 +88,61 @@ public interface AsyncConsistentMap<K, V> {
84 CompletableFuture<Versioned<V>> get(K key); 88 CompletableFuture<Versioned<V>> get(K key);
85 89
86 /** 90 /**
91 + * If the specified key is not already associated with a value (or is mapped to null),
92 + * attempts to compute its value using the given mapping function and enters it into
93 + * this map unless null.
94 + * If a conflicting concurrent modification attempt is detected, the returned future
95 + * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
96 + * @param key key with which the specified value is to be associated
97 + * @param mappingFunction the function to compute a value
98 + * @return the current (existing or computed) value associated with the specified key,
99 + * or null if the computed value is null
100 + */
101 + CompletableFuture<Versioned<V>> computeIfAbsent(K key,
102 + Function<? super K, ? extends V> mappingFunction);
103 +
104 + /**
105 + * If the value for the specified key is present and non-null, attempts to compute a new
106 + * mapping given the key and its current mapped value.
107 + * If the computed value is null, the current mapping will be removed from the map.
108 + * If a conflicting concurrent modification attempt is detected, the returned future
109 + * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
110 + * @param key key with which the specified value is to be associated
111 + * @param remappingFunction the function to compute a value
112 + * @return the new value associated with the specified key, or null if computed value is null
113 + */
114 + CompletableFuture<Versioned<V>> computeIfPresent(K key,
115 + BiFunction<? super K, ? super V, ? extends V> remappingFunction);
116 +
117 + /**
118 + * Attempts to compute a mapping for the specified key and its current mapped value (or
119 + * null if there is no current mapping).
120 + * If the computed value is null, the current mapping (if one exists) will be removed from the map.
121 + * If a conflicting concurrent modification attempt is detected, the returned future
122 + * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
123 + * @param key key with which the specified value is to be associated
124 + * @param remappingFunction the function to compute a value
125 + * @return the new value associated with the specified key, or null if computed value is null
126 + */
127 + CompletableFuture<Versioned<V>> compute(K key,
128 + BiFunction<? super K, ? super V, ? extends V> remappingFunction);
129 +
130 + /**
131 + * If the value for the specified key satisfies a condition, attempts to compute a new
132 + * mapping given the key and its current mapped value.
133 + * If the computed value is null, the current mapping will be removed from the map.
134 + * If a conflicting concurrent modification attempt is detected, the returned future
135 + * will be completed exceptionally with ConsistentMapException.ConcurrentModification.
136 + * @param key key with which the specified value is to be associated
137 + * @param condition condition that should evaluate to true for the computation to proceed
138 + * @param remappingFunction the function to compute a value
139 + * @return the new value associated with the specified key, or the old value if condition evaluates to false
140 + */
141 + CompletableFuture<Versioned<V>> computeIf(K key,
142 + Predicate<? super V> condition,
143 + BiFunction<? super K, ? super V, ? extends V> remappingFunction);
144 +
145 + /**
87 * Associates the specified value with the specified key in this map (optional operation). 146 * Associates the specified value with the specified key in this map (optional operation).
88 * If the map previously contained a mapping for the key, the old value is replaced by the 147 * If the map previously contained a mapping for the key, the old value is replaced by the
89 * specified value. 148 * specified value.
...@@ -96,6 +155,28 @@ public interface AsyncConsistentMap<K, V> { ...@@ -96,6 +155,28 @@ public interface AsyncConsistentMap<K, V> {
96 CompletableFuture<Versioned<V>> put(K key, V value); 155 CompletableFuture<Versioned<V>> put(K key, V value);
97 156
98 /** 157 /**
158 + * Associates the specified value with the specified key in this map (optional operation).
159 + * If the map previously contained a mapping for the key, the old value is replaced by the
160 + * specified value.
161 + *
162 + * @param key key with which the specified value is to be associated
163 + * @param value value to be associated with the specified key
164 + * @return new value.
165 + */
166 + CompletableFuture<Versioned<V>> putAndGet(K key, V value);
167 +
168 + /**
169 + * Associates the specified value with the specified key in this map (optional operation).
170 + * If the map previously contained a mapping for the key, the old value is replaced by the
171 + * specified value.
172 + *
173 + * @param key key with which the specified value is to be associated
174 + * @param value value to be associated with the specified key
175 + * @return optional updated value. Will be empty if update did not happen
176 + */
177 + CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value);
178 +
179 + /**
99 * Removes the mapping for a key from this map if it is present (optional operation). 180 * Removes the mapping for a key from this map if it is present (optional operation).
100 * 181 *
101 * @param key key whose value is to be removed from the map 182 * @param key key whose value is to be removed from the map
...@@ -196,4 +277,15 @@ public interface AsyncConsistentMap<K, V> { ...@@ -196,4 +277,15 @@ public interface AsyncConsistentMap<K, V> {
196 * @return true if the value was replaced 277 * @return true if the value was replaced
197 */ 278 */
198 CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue); 279 CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue);
280 +
281 + /**
282 + * Replaces the entry for the specified key only if it is currently mapped to the
283 + * specified version.
284 + *
285 + * @param key key key with which the specified value is associated
286 + * @param oldVersion version expected to be associated with the specified key
287 + * @param newValue value to be associated with the specified key
288 + * @return optional updated value. Will be empty if update did not happen.
289 + */
290 + CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
199 } 291 }
......
...@@ -18,7 +18,11 @@ package org.onosproject.store.service; ...@@ -18,7 +18,11 @@ package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 +import java.util.Optional;
21 import java.util.Set; 22 import java.util.Set;
23 +import java.util.function.BiFunction;
24 +import java.util.function.Function;
25 +import java.util.function.Predicate;
22 26
23 /** 27 /**
24 * A distributed, strongly consistent map. 28 * A distributed, strongly consistent map.
...@@ -83,6 +87,64 @@ public interface ConsistentMap<K, V> { ...@@ -83,6 +87,64 @@ public interface ConsistentMap<K, V> {
83 Versioned<V> get(K key); 87 Versioned<V> get(K key);
84 88
85 /** 89 /**
90 + * If the specified key is not already associated with a value (or is mapped to null),
91 + * attempts to compute its value using the given mapping function and enters it into
92 + * this map unless null.
93 + *
94 + * @param key key with which the specified value is to be associated
95 + * @param mappingFunction the function to compute a value
96 + * @return the current (existing or computed) value associated with the specified key,
97 + * or null if the computed value is null. Method throws {@code ConsistentMapException.ConcurrentModification}
98 + * if a concurrent modification of map is detected
99 + */
100 + Versioned<V> computeIfAbsent(K key,
101 + Function<? super K, ? extends V> mappingFunction);
102 +
103 + /**
104 + * Attempts to compute a mapping for the specified key and its current mapped value (or
105 + * null if there is no current mapping).
106 + * If the computed value is null, the current mapping will be removed from the map.
107 + *
108 + * @param key key with which the specified value is to be associated
109 + * @param remappingFunction the function to compute a value
110 + * @return the new value associated with the specified key, or null if none.
111 + * This method throws {@code ConsistentMapException.ConcurrentModification}
112 + * if a concurrent modification of map is detected
113 + */
114 + Versioned<V> compute(K key,
115 + BiFunction<? super K, ? super V, ? extends V> remappingFunction);
116 +
117 + /**
118 + * If the value for the specified key is present and non-null, attempts to compute a new
119 + * mapping given the key and its current mapped value.
120 + * If the computed value is null, the current mapping will be removed from the map.
121 + *
122 + * @param key key with which the specified value is to be associated
123 + * @param remappingFunction the function to compute a value
124 + * @return the new value associated with the specified key, or null if none.
125 + * This method throws {@code ConsistentMapException.ConcurrentModification}
126 + * if a concurrent modification of map is detected
127 + */
128 + Versioned<V> computeIfPresent(K key,
129 + BiFunction<? super K, ? super V, ? extends V> remappingFunction);
130 +
131 + /**
132 + * If the value for the specified key satisfies a condition, attempts to compute a new
133 + * mapping given the key and its current mapped value.
134 + * If the computed value is null, the current mapping will be removed from the map.
135 + *
136 + * @param key key with which the specified value is to be associated
137 + * @param condition condition that should evaluate to true for the computation to proceed
138 + * @param remappingFunction the function to compute a value
139 + * @return the new value associated with the specified key, or the old value if condition evaluates to false.
140 + * This method throws {@code ConsistentMapException.ConcurrentModification} if a concurrent
141 + * modification of map is detected
142 + */
143 + Versioned<V> computeIf(K key,
144 + Predicate<? super V> condition,
145 + BiFunction<? super K, ? super V, ? extends V> remappingFunction);
146 +
147 + /**
86 * Associates the specified value with the specified key in this map (optional operation). 148 * Associates the specified value with the specified key in this map (optional operation).
87 * If the map previously contained a mapping for the key, the old value is replaced by the 149 * If the map previously contained a mapping for the key, the old value is replaced by the
88 * specified value. 150 * specified value.
...@@ -95,6 +157,28 @@ public interface ConsistentMap<K, V> { ...@@ -95,6 +157,28 @@ public interface ConsistentMap<K, V> {
95 Versioned<V> put(K key, V value); 157 Versioned<V> put(K key, V value);
96 158
97 /** 159 /**
160 + * Associates the specified value with the specified key in this map (optional operation).
161 + * If the map previously contained a mapping for the key, the old value is replaced by the
162 + * specified value.
163 + *
164 + * @param key key with which the specified value is to be associated
165 + * @param value value to be associated with the specified key
166 + * @return new value.
167 + */
168 + Versioned<V> putAndGet(K key, V value);
169 +
170 + /**
171 + * Associates the specified value with the specified key in this map (optional operation).
172 + * If the map previously contained a mapping for the key, the old value is replaced by the
173 + * specified value.
174 + *
175 + * @param key key with which the specified value is to be associated
176 + * @param value value to be associated with the specified key
177 + * @return optional updated value. Will be empty if update did not happen
178 + */
179 + Optional<Versioned<V>> putIfAbsentAndGet(K key, V value);
180 +
181 + /**
98 * Removes the mapping for a key from this map if it is present (optional operation). 182 * Removes the mapping for a key from this map if it is present (optional operation).
99 * 183 *
100 * @param key key whose value is to be removed from the map 184 * @param key key whose value is to be removed from the map
...@@ -194,4 +278,15 @@ public interface ConsistentMap<K, V> { ...@@ -194,4 +278,15 @@ public interface ConsistentMap<K, V> {
194 * @return true if the value was replaced 278 * @return true if the value was replaced
195 */ 279 */
196 boolean replace(K key, long oldVersion, V newValue); 280 boolean replace(K key, long oldVersion, V newValue);
281 +
282 + /**
283 + * Replaces the entry for the specified key only if it is currently mapped to the
284 + * specified version.
285 + *
286 + * @param key key key with which the specified value is associated
287 + * @param oldVersion version expected to be associated with the specified key
288 + * @param newValue value to be associated with the specified key
289 + * @return optional new value. Will be empty if replace did not happen
290 + */
291 + Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
197 } 292 }
......
...@@ -97,6 +97,26 @@ public interface DatabaseProxy<K, V> { ...@@ -97,6 +97,26 @@ public interface DatabaseProxy<K, V> {
97 CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value); 97 CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
98 98
99 /** 99 /**
100 + * Puts a value in the table.
101 + *
102 + * @param tableName table name
103 + * @param key The key to set.
104 + * @param value The value to set.
105 + * @return A completable future to be completed with the result once complete.
106 + */
107 + CompletableFuture<Result<UpdateResult<Versioned<V>>>> putAndGet(String tableName, K key, V value);
108 +
109 + /**
110 + * Puts a value in the table.
111 + *
112 + * @param tableName table name
113 + * @param key The key to set.
114 + * @param value The value to set.
115 + * @return A completable future to be completed with the result once complete.
116 + */
117 + CompletableFuture<Result<UpdateResult<Versioned<V>>>> putIfAbsentAndGet(String tableName, K key, V value);
118 +
119 + /**
100 * Removes a value from the table. 120 * Removes a value from the table.
101 * 121 *
102 * @param tableName table name 122 * @param tableName table name
...@@ -190,6 +210,19 @@ public interface DatabaseProxy<K, V> { ...@@ -190,6 +210,19 @@ public interface DatabaseProxy<K, V> {
190 CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue); 210 CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
191 211
192 /** 212 /**
213 + * Replaces the entry for the specified key only if currently mapped to the specified version.
214 + *
215 + * @param tableName table name
216 + * @param key The key to update
217 + * @param oldVersion existing version in the map for this replace to succeed.
218 + * @param newValue The value with which to replace the given key and version.
219 + * @return A completable future to be completed with the result once complete.
220 + */
221 + CompletableFuture<Result<UpdateResult<Versioned<V>>>> replaceAndGet(String tableName,
222 + K key, long oldVersion,
223 + V newValue);
224 +
225 + /**
193 * Atomically add the given value to current value of the specified counter. 226 * Atomically add the given value to current value of the specified counter.
194 * 227 *
195 * @param counterName counter name 228 * @param counterName counter name
......
...@@ -74,6 +74,7 @@ public class DatabaseSerializer extends SerializerConfig { ...@@ -74,6 +74,7 @@ public class DatabaseSerializer extends SerializerConfig {
74 .register(Pair.class) 74 .register(Pair.class)
75 .register(ImmutablePair.class) 75 .register(ImmutablePair.class)
76 .register(Result.class) 76 .register(Result.class)
77 + .register(UpdateResult.class)
77 .register(Result.Status.class) 78 .register(Result.Status.class)
78 .register(DefaultTransaction.class) 79 .register(DefaultTransaction.class)
79 .register(Transaction.State.class) 80 .register(Transaction.State.class)
......
...@@ -68,6 +68,12 @@ public interface DatabaseState<K, V> { ...@@ -68,6 +68,12 @@ public interface DatabaseState<K, V> {
68 Result<Versioned<V>> put(String tableName, K key, V value); 68 Result<Versioned<V>> put(String tableName, K key, V value);
69 69
70 @Command 70 @Command
71 + Result<UpdateResult<Versioned<V>>> putAndGet(String tableName, K key, V value);
72 +
73 + @Command
74 + Result<UpdateResult<Versioned<V>>> putIfAbsentAndGet(String tableName, K key, V value);
75 +
76 + @Command
71 Result<Versioned<V>> remove(String tableName, K key); 77 Result<Versioned<V>> remove(String tableName, K key);
72 78
73 @Command 79 @Command
...@@ -98,6 +104,9 @@ public interface DatabaseState<K, V> { ...@@ -98,6 +104,9 @@ public interface DatabaseState<K, V> {
98 Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue); 104 Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
99 105
100 @Command 106 @Command
107 + Result<UpdateResult<Versioned<V>>> replaceAndGet(String tableName, K key, long oldVersion, V newValue);
108 +
109 + @Command
101 Long counterAddAndGet(String counterName, long delta); 110 Long counterAddAndGet(String counterName, long delta);
102 111
103 @Command 112 @Command
......
...@@ -21,12 +21,19 @@ import static com.google.common.base.Preconditions.*; ...@@ -21,12 +21,19 @@ import static com.google.common.base.Preconditions.*;
21 import java.util.Collection; 21 import java.util.Collection;
22 import java.util.Map; 22 import java.util.Map;
23 import java.util.Map.Entry; 23 import java.util.Map.Entry;
24 +import java.util.Objects;
25 +import java.util.Optional;
24 import java.util.concurrent.CompletableFuture; 26 import java.util.concurrent.CompletableFuture;
27 +import java.util.concurrent.atomic.AtomicReference;
28 +import java.util.function.BiFunction;
29 +import java.util.function.Function;
30 +import java.util.function.Predicate;
25 import java.util.stream.Collectors; 31 import java.util.stream.Collectors;
26 import java.util.Set; 32 import java.util.Set;
27 33
28 import org.apache.commons.lang3.tuple.Pair; 34 import org.apache.commons.lang3.tuple.Pair;
29 import org.onlab.util.HexString; 35 import org.onlab.util.HexString;
36 +import org.onlab.util.Tools;
30 import org.onosproject.store.service.AsyncConsistentMap; 37 import org.onosproject.store.service.AsyncConsistentMap;
31 import org.onosproject.store.service.ConsistentMapException; 38 import org.onosproject.store.service.ConsistentMapException;
32 import org.onosproject.store.service.Serializer; 39 import org.onosproject.store.service.Serializer;
...@@ -108,6 +115,84 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -108,6 +115,84 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
108 } 115 }
109 116
110 @Override 117 @Override
118 + public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
119 + Function<? super K, ? extends V> mappingFunction) {
120 + return computeIf(key, Objects::isNull, (k, v) -> mappingFunction.apply(k));
121 + }
122 +
123 + @Override
124 + public CompletableFuture<Versioned<V>> computeIfPresent(K key,
125 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
126 + return computeIf(key, Objects::nonNull, remappingFunction);
127 + }
128 +
129 + @Override
130 + public CompletableFuture<Versioned<V>> compute(K key,
131 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
132 + return computeIf(key, v -> true, remappingFunction);
133 + }
134 +
135 + @Override
136 + public CompletableFuture<Versioned<V>> computeIf(K key,
137 + Predicate<? super V> condition,
138 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
139 + checkNotNull(key, ERROR_NULL_KEY);
140 + checkNotNull(condition, "predicate function cannot be null");
141 + checkNotNull(remappingFunction, "Remapping function cannot be null");
142 + return get(key).thenCompose(r1 -> {
143 + V existingValue = r1 == null ? null : r1.value();
144 + // if the condition evaluates to false, return existing value.
145 + if (!condition.test(existingValue)) {
146 + return CompletableFuture.completedFuture(r1);
147 + }
148 +
149 + AtomicReference<V> computedValue = new AtomicReference<>();
150 + // if remappingFunction throws an exception, return the exception.
151 + try {
152 + computedValue.set(remappingFunction.apply(key, existingValue));
153 + } catch (Exception e) {
154 + return Tools.exceptionalFuture(e);
155 + }
156 +
157 + // if the computed value is null, remove current value if one exists.
158 + // throw an exception if concurrent modification is detected.
159 + if (computedValue.get() == null) {
160 + if (r1 != null) {
161 + return remove(key, r1.version()).thenApply(result -> {
162 + if (result) {
163 + return null;
164 + } else {
165 + throw new ConsistentMapException.ConcurrentModification();
166 + }
167 + });
168 + } else {
169 + return CompletableFuture.completedFuture(null);
170 + }
171 + } else {
172 + // replace current value; throw an exception if concurrent modification is detected
173 + if (r1 != null) {
174 + return replaceAndGet(key, r1.version(), computedValue.get())
175 + .thenApply(v -> {
176 + if (v.isPresent()) {
177 + return v.get();
178 + } else {
179 + throw new ConsistentMapException.ConcurrentModification();
180 + }
181 + });
182 + } else {
183 + return putIfAbsentAndGet(key, computedValue.get()).thenApply(result -> {
184 + if (!result.isPresent()) {
185 + throw new ConsistentMapException.ConcurrentModification();
186 + } else {
187 + return result.get();
188 + }
189 + });
190 + }
191 + }
192 + });
193 + }
194 +
195 + @Override
111 public CompletableFuture<Versioned<V>> put(K key, V value) { 196 public CompletableFuture<Versioned<V>> put(K key, V value) {
112 checkNotNull(key, ERROR_NULL_KEY); 197 checkNotNull(key, ERROR_NULL_KEY);
113 checkNotNull(value, ERROR_NULL_VALUE); 198 checkNotNull(value, ERROR_NULL_VALUE);
...@@ -119,6 +204,40 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -119,6 +204,40 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
119 } 204 }
120 205
121 @Override 206 @Override
207 + public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
208 + checkNotNull(key, ERROR_NULL_KEY);
209 + checkNotNull(value, ERROR_NULL_VALUE);
210 + checkIfUnmodifiable();
211 + return database.putAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
212 + .thenApply(this::unwrapResult)
213 + .thenApply(v -> {
214 + Versioned<byte[]> rawNewValue = v.newValue();
215 + return new Versioned<>(serializer.decode(rawNewValue.value()),
216 + rawNewValue.version(),
217 + rawNewValue.creationTime());
218 + });
219 + }
220 +
221 + @Override
222 + public CompletableFuture<Optional<Versioned<V>>> putIfAbsentAndGet(K key, V value) {
223 + checkNotNull(key, ERROR_NULL_KEY);
224 + checkNotNull(value, ERROR_NULL_VALUE);
225 + checkIfUnmodifiable();
226 + return database.putIfAbsentAndGet(name, keyCache.getUnchecked(key), serializer.encode(value))
227 + .thenApply(this::unwrapResult)
228 + .thenApply(v -> {
229 + if (v.updated()) {
230 + Versioned<byte[]> rawNewValue = v.newValue();
231 + return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
232 + rawNewValue.version(),
233 + rawNewValue.creationTime()));
234 + } else {
235 + return Optional.empty();
236 + }
237 + });
238 + }
239 +
240 + @Override
122 public CompletableFuture<Versioned<V>> remove(K key) { 241 public CompletableFuture<Versioned<V>> remove(K key) {
123 checkNotNull(key, ERROR_NULL_KEY); 242 checkNotNull(key, ERROR_NULL_KEY);
124 checkIfUnmodifiable(); 243 checkIfUnmodifiable();
...@@ -202,11 +321,29 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -202,11 +321,29 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
202 321
203 @Override 322 @Override
204 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) { 323 public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
324 + return replaceAndGet(key, oldVersion, newValue).thenApply(Optional::isPresent);
325 + }
326 +
327 + @Override
328 + public CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue) {
205 checkNotNull(key, ERROR_NULL_KEY); 329 checkNotNull(key, ERROR_NULL_KEY);
206 checkNotNull(newValue, ERROR_NULL_VALUE); 330 checkNotNull(newValue, ERROR_NULL_VALUE);
207 checkIfUnmodifiable(); 331 checkIfUnmodifiable();
208 - return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)) 332 + return database.replaceAndGet(name,
209 - .thenApply(this::unwrapResult); 333 + keyCache.getUnchecked(key),
334 + oldVersion,
335 + serializer.encode(newValue))
336 + .thenApply(this::unwrapResult)
337 + .thenApply(v -> {
338 + if (v.updated()) {
339 + Versioned<byte[]> rawNewValue = v.newValue();
340 + return Optional.of(new Versioned<>(serializer.decode(rawNewValue.value()),
341 + rawNewValue.version(),
342 + rawNewValue.creationTime()));
343 + } else {
344 + return Optional.empty();
345 + }
346 + });
210 } 347 }
211 348
212 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) { 349 private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
......
...@@ -18,10 +18,14 @@ package org.onosproject.store.consistent.impl; ...@@ -18,10 +18,14 @@ package org.onosproject.store.consistent.impl;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 import java.util.Map.Entry; 20 import java.util.Map.Entry;
21 +import java.util.Optional;
21 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.ExecutionException; 23 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException; 25 import java.util.concurrent.TimeoutException;
26 +import java.util.function.BiFunction;
27 +import java.util.function.Function;
28 +import java.util.function.Predicate;
25 import java.util.Set; 29 import java.util.Set;
26 30
27 import org.onosproject.store.service.AsyncConsistentMap; 31 import org.onosproject.store.service.AsyncConsistentMap;
...@@ -76,11 +80,46 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -76,11 +80,46 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
76 } 80 }
77 81
78 @Override 82 @Override
83 + public Versioned<V> computeIfAbsent(K key,
84 + Function<? super K, ? extends V> mappingFunction) {
85 + return complete(asyncMap.computeIfAbsent(key, mappingFunction));
86 + }
87 +
88 + @Override
89 + public Versioned<V> computeIfPresent(K key,
90 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
91 + return complete(asyncMap.computeIfPresent(key, remappingFunction));
92 + }
93 +
94 + @Override
95 + public Versioned<V> compute(K key,
96 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
97 + return complete(asyncMap.compute(key, remappingFunction));
98 + }
99 +
100 + @Override
101 + public Versioned<V> computeIf(K key,
102 + Predicate<? super V> condition,
103 + BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
104 + return complete(asyncMap.computeIf(key, condition, remappingFunction));
105 + }
106 +
107 + @Override
79 public Versioned<V> put(K key, V value) { 108 public Versioned<V> put(K key, V value) {
80 return complete(asyncMap.put(key, value)); 109 return complete(asyncMap.put(key, value));
81 } 110 }
82 111
83 @Override 112 @Override
113 + public Versioned<V> putAndGet(K key, V value) {
114 + return complete(asyncMap.putAndGet(key, value));
115 + }
116 +
117 + @Override
118 + public Optional<Versioned<V>> putIfAbsentAndGet(K key, V value) {
119 + return complete(asyncMap.putIfAbsentAndGet(key, value));
120 + }
121 +
122 + @Override
84 public Versioned<V> remove(K key) { 123 public Versioned<V> remove(K key) {
85 return complete(asyncMap.remove(key)); 124 return complete(asyncMap.remove(key));
86 } 125 }
...@@ -130,6 +169,11 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -130,6 +169,11 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
130 return complete(asyncMap.replace(key, oldVersion, newValue)); 169 return complete(asyncMap.replace(key, oldVersion, newValue));
131 } 170 }
132 171
172 + @Override
173 + public Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue) {
174 + return complete(asyncMap.replaceAndGet(key, oldVersion, newValue));
175 + }
176 +
133 private static <T> T complete(CompletableFuture<T> future) { 177 private static <T> T complete(CompletableFuture<T> future) {
134 try { 178 try {
135 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); 179 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
...@@ -139,7 +183,11 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -139,7 +183,11 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
139 } catch (TimeoutException e) { 183 } catch (TimeoutException e) {
140 throw new ConsistentMapException.Timeout(); 184 throw new ConsistentMapException.Timeout();
141 } catch (ExecutionException e) { 185 } catch (ExecutionException e) {
186 + if (e.getCause() instanceof ConsistentMapException) {
187 + throw (ConsistentMapException) e.getCause();
188 + } else {
142 throw new ConsistentMapException(e.getCause()); 189 throw new ConsistentMapException(e.getCause());
143 } 190 }
144 } 191 }
192 + }
145 } 193 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -100,6 +100,20 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -100,6 +100,20 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
100 } 100 }
101 101
102 @Override 102 @Override
103 + public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
104 + String key,
105 + byte[] value) {
106 + return checkOpen(() -> proxy.putAndGet(tableName, key, value));
107 + }
108 +
109 + @Override
110 + public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
111 + String key,
112 + byte[] value) {
113 + return checkOpen(() -> proxy.putIfAbsentAndGet(tableName, key, value));
114 + }
115 +
116 + @Override
103 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) { 117 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
104 return checkOpen(() -> proxy.remove(tableName, key)); 118 return checkOpen(() -> proxy.remove(tableName, key));
105 } 119 }
...@@ -150,6 +164,14 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -150,6 +164,14 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
150 } 164 }
151 165
152 @Override 166 @Override
167 + public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(String tableName,
168 + String key,
169 + long oldVersion,
170 + byte[] newValue) {
171 + return checkOpen(() -> proxy.replaceAndGet(tableName, key, oldVersion, newValue));
172 + }
173 +
174 + @Override
153 public CompletableFuture<Long> counterGet(String counterName) { 175 public CompletableFuture<Long> counterGet(String counterName) {
154 return checkOpen(() -> proxy.counterGet(counterName)); 176 return checkOpen(() -> proxy.counterGet(counterName));
155 } 177 }
......
...@@ -30,6 +30,7 @@ import org.onosproject.store.service.DatabaseUpdate; ...@@ -30,6 +30,7 @@ import org.onosproject.store.service.DatabaseUpdate;
30 import org.onosproject.store.service.Transaction; 30 import org.onosproject.store.service.Transaction;
31 import org.onosproject.store.service.Versioned; 31 import org.onosproject.store.service.Versioned;
32 import org.onosproject.store.service.DatabaseUpdate.Type; 32 import org.onosproject.store.service.DatabaseUpdate.Type;
33 +
33 import com.google.common.base.Objects; 34 import com.google.common.base.Objects;
34 import com.google.common.collect.ImmutableList; 35 import com.google.common.collect.ImmutableList;
35 import com.google.common.collect.ImmutableSet; 36 import com.google.common.collect.ImmutableSet;
...@@ -128,6 +129,36 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -128,6 +129,36 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
128 } 129 }
129 130
130 @Override 131 @Override
132 + public Result<UpdateResult<Versioned<byte[]>>> putAndGet(String tableName,
133 + String key,
134 + byte[] value) {
135 + if (isLockedForUpdates(tableName, key)) {
136 + return Result.locked();
137 + } else {
138 + Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
139 + Versioned<byte[]> oldValue = getTableMap(tableName).put(key, newValue);
140 + return Result.ok(new UpdateResult<>(true, oldValue, newValue));
141 + }
142 + }
143 +
144 + @Override
145 + public Result<UpdateResult<Versioned<byte[]>>> putIfAbsentAndGet(String tableName,
146 + String key,
147 + byte[] value) {
148 + if (isLockedForUpdates(tableName, key)) {
149 + return Result.locked();
150 + }
151 + Versioned<byte[]> currentValue = getTableMap(tableName).get(key);
152 + if (currentValue != null) {
153 + return Result.ok(new UpdateResult<>(false, currentValue, currentValue));
154 + } else {
155 + Versioned<byte[]> newValue = new Versioned<>(value, ++nextVersion);
156 + getTableMap(tableName).put(key, newValue);
157 + return Result.ok(new UpdateResult<>(true, null, newValue));
158 + }
159 + }
160 +
161 + @Override
131 public Result<Versioned<byte[]>> remove(String tableName, String key) { 162 public Result<Versioned<byte[]>> remove(String tableName, String key) {
132 return isLockedForUpdates(tableName, key) 163 return isLockedForUpdates(tableName, key)
133 ? Result.locked() 164 ? Result.locked()
...@@ -225,6 +256,23 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -225,6 +256,23 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
225 } 256 }
226 257
227 @Override 258 @Override
259 + public Result<UpdateResult<Versioned<byte[]>>> replaceAndGet(
260 + String tableName, String key, long oldVersion, byte[] newValue) {
261 + if (isLockedForUpdates(tableName, key)) {
262 + return Result.locked();
263 + }
264 + boolean updated = false;
265 + Versioned<byte[]> previous = get(tableName, key);
266 + Versioned<byte[]> current = previous;
267 + if (previous != null && previous.version() == oldVersion) {
268 + current = new Versioned<>(newValue, ++nextVersion);
269 + getTableMap(tableName).put(key, current);
270 + updated = true;
271 + }
272 + return Result.ok(new UpdateResult<>(updated, previous, current));
273 + }
274 +
275 + @Override
228 public Long counterAddAndGet(String counterName, long delta) { 276 public Long counterAddAndGet(String counterName, long delta) {
229 return getCounter(counterName).addAndGet(delta); 277 return getCounter(counterName).addAndGet(delta);
230 } 278 }
......
...@@ -216,39 +216,24 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -216,39 +216,24 @@ public class DistributedLeadershipManager implements LeadershipService {
216 216
217 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) { 217 private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
218 try { 218 try {
219 - Versioned<List<NodeId>> candidates = candidateMap.get(path); 219 + Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
220 - if (candidates != null) { 220 + currentList -> currentList == null || !currentList.contains(localNodeId),
221 - List<NodeId> candidateList = Lists.newArrayList(candidates.value()); 221 + (topic, currentList) -> {
222 - if (!candidateList.contains(localNodeId)) { 222 + if (currentList == null) {
223 - candidateList.add(localNodeId); 223 + return ImmutableList.of(localNodeId);
224 - if (candidateMap.replace(path, candidates.version(), candidateList)) {
225 - Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
226 - publish(new LeadershipEvent(
227 - LeadershipEvent.Type.CANDIDATES_CHANGED,
228 - new Leadership(path,
229 - newCandidates.value(),
230 - newCandidates.version(),
231 - newCandidates.creationTime())));
232 } else { 224 } else {
233 - rerunForLeadership(path, future); 225 + List<NodeId> newList = Lists.newLinkedList();
234 - return; 226 + newList.addAll(currentList);
227 + newList.add(localNodeId);
228 + return newList;
235 } 229 }
236 - } 230 + });
237 - } else {
238 - List<NodeId> candidateList = ImmutableList.of(localNodeId);
239 - if ((candidateMap.putIfAbsent(path, candidateList) == null)) {
240 - Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
241 publish(new LeadershipEvent( 231 publish(new LeadershipEvent(
242 LeadershipEvent.Type.CANDIDATES_CHANGED, 232 LeadershipEvent.Type.CANDIDATES_CHANGED,
243 new Leadership(path, 233 new Leadership(path,
244 - newCandidates.value(), 234 + candidates.value(),
245 - newCandidates.version(), 235 + candidates.version(),
246 - newCandidates.creationTime()))); 236 + candidates.creationTime())));
247 - } else {
248 - rerunForLeadership(path, future);
249 - return;
250 - }
251 - }
252 log.debug("In the leadership race for topic {} with candidates {}", path, candidates); 237 log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
253 activeTopics.add(path); 238 activeTopics.add(path);
254 tryLeaderLock(path, future); 239 tryLeaderLock(path, future);
...@@ -352,28 +337,22 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -352,28 +337,22 @@ public class DistributedLeadershipManager implements LeadershipService {
352 337
353 @Override 338 @Override
354 public boolean makeTopCandidate(String path, NodeId nodeId) { 339 public boolean makeTopCandidate(String path, NodeId nodeId) {
355 - Versioned<List<NodeId>> candidates = candidateMap.get(path); 340 + Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
356 - if (candidates == null || !candidates.value().contains(nodeId)) { 341 + candidates -> (candidates != null && candidates.contains(nodeId)) ||
357 - return false; 342 + (candidates != null && Objects.equals(nodeId, candidates.get(LEADER_CANDIDATE_POS))),
358 - } 343 + (topic, candidates) -> {
359 - List<NodeId> currentRoster = candidates.value(); 344 + List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
360 - if (nodeId.equals(currentRoster.get(LEADER_CANDIDATE_POS))) { 345 + updatedCandidates.add(nodeId);
361 - return true; 346 + candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
362 - } 347 + return updatedCandidates;
363 - List<NodeId> newRoster = new ArrayList<>(currentRoster.size()); 348 + });
364 - newRoster.add(nodeId);
365 - currentRoster.stream().filter(id -> !nodeId.equals(id)).forEach(newRoster::add);
366 - boolean updated = candidateMap.replace(path, candidates.version(), newRoster);
367 - if (updated) {
368 - Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
369 publish(new LeadershipEvent( 349 publish(new LeadershipEvent(
370 LeadershipEvent.Type.CANDIDATES_CHANGED, 350 LeadershipEvent.Type.CANDIDATES_CHANGED,
371 new Leadership(path, 351 new Leadership(path,
372 newCandidates.value(), 352 newCandidates.value(),
373 newCandidates.version(), 353 newCandidates.version(),
374 newCandidates.creationTime()))); 354 newCandidates.creationTime())));
375 - } 355 + return true;
376 - return updated;
377 } 356 }
378 357
379 private void tryLeaderLock(String path, CompletableFuture<Leadership> future) { 358 private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
...@@ -403,42 +382,20 @@ public class DistributedLeadershipManager implements LeadershipService { ...@@ -403,42 +382,20 @@ public class DistributedLeadershipManager implements LeadershipService {
403 382
404 private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) { 383 private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
405 try { 384 try {
406 - Versioned<NodeId> currentLeader = leaderMap.get(path); 385 + Versioned<NodeId> leader = leaderMap.computeIfAbsent(path, p -> localNodeId);
407 - if (currentLeader != null) { 386 + if (Objects.equals(leader.value(), localNodeId)) {
408 - if (localNodeId.equals(currentLeader.value())) {
409 - log.debug("Already has leadership for {}", path);
410 - // FIXME: candidates can get out of sync.
411 - Leadership leadership = new Leadership(path,
412 - localNodeId,
413 - currentLeader.version(),
414 - currentLeader.creationTime());
415 - future.complete(leadership);
416 - publish(new LeadershipEvent(
417 - LeadershipEvent.Type.LEADER_ELECTED,
418 - leadership));
419 - } else {
420 - // someone else has leadership. will retry after sometime.
421 - retryLock(path, future);
422 - }
423 - } else {
424 - if (leaderMap.putIfAbsent(path, localNodeId) == null) {
425 log.debug("Assumed leadership for {}", path); 387 log.debug("Assumed leadership for {}", path);
426 - // do a get again to get the version (epoch)
427 - Versioned<NodeId> newLeader = leaderMap.get(path);
428 - // FIXME: candidates can get out of sync
429 Leadership leadership = new Leadership(path, 388 Leadership leadership = new Leadership(path,
430 - newLeader.value(), 389 + leader.value(),
431 - newLeader.version(), 390 + leader.version(),
432 - newLeader.creationTime()); 391 + leader.creationTime());
433 future.complete(leadership); 392 future.complete(leadership);
434 publish(new LeadershipEvent( 393 publish(new LeadershipEvent(
435 LeadershipEvent.Type.LEADER_ELECTED, 394 LeadershipEvent.Type.LEADER_ELECTED,
436 leadership)); 395 leadership));
437 } else { 396 } else {
438 - // someone beat us to it.
439 retryLock(path, future); 397 retryLock(path, future);
440 } 398 }
441 - }
442 } catch (Exception e) { 399 } catch (Exception e) {
443 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e); 400 log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
444 retryLock(path, future); 401 retryLock(path, future);
......
...@@ -152,6 +152,22 @@ public class PartitionedDatabase implements Database { ...@@ -152,6 +152,22 @@ public class PartitionedDatabase implements Database {
152 } 152 }
153 153
154 @Override 154 @Override
155 + public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putAndGet(String tableName,
156 + String key,
157 + byte[] value) {
158 + checkState(isOpen.get(), DB_NOT_OPEN);
159 + return partitioner.getPartition(tableName, key).putAndGet(tableName, key, value);
160 + }
161 +
162 + @Override
163 + public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> putIfAbsentAndGet(String tableName,
164 + String key,
165 + byte[] value) {
166 + checkState(isOpen.get(), DB_NOT_OPEN);
167 + return partitioner.getPartition(tableName, key).putIfAbsentAndGet(tableName, key, value);
168 + }
169 +
170 + @Override
155 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) { 171 public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
156 checkState(isOpen.get(), DB_NOT_OPEN); 172 checkState(isOpen.get(), DB_NOT_OPEN);
157 return partitioner.getPartition(tableName, key).remove(tableName, key); 173 return partitioner.getPartition(tableName, key).remove(tableName, key);
...@@ -235,6 +251,13 @@ public class PartitionedDatabase implements Database { ...@@ -235,6 +251,13 @@ public class PartitionedDatabase implements Database {
235 } 251 }
236 252
237 @Override 253 @Override
254 + public CompletableFuture<Result<UpdateResult<Versioned<byte[]>>>> replaceAndGet(
255 + String tableName, String key, long oldVersion, byte[] newValue) {
256 + checkState(isOpen.get(), DB_NOT_OPEN);
257 + return partitioner.getPartition(tableName, key).replaceAndGet(tableName, key, oldVersion, newValue);
258 + }
259 +
260 + @Override
238 public CompletableFuture<Long> counterGet(String counterName) { 261 public CompletableFuture<Long> counterGet(String counterName) {
239 checkState(isOpen.get(), DB_NOT_OPEN); 262 checkState(isOpen.get(), DB_NOT_OPEN);
240 return partitioner.getPartition(counterName, counterName).counterGet(counterName); 263 return partitioner.getPartition(counterName, counterName).counterGet(counterName);
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.consistent.impl;
17 +
18 +/**
19 + * Result of a update operation.
20 + * <p>
21 + * Both old and new values are accessible along with a flag that indicates if the
22 + * the value was updated. If flag is false, oldValue and newValue both
23 + * point to the same unmodified value.
24 + * @param <V> result type
25 + */
26 +public class UpdateResult<V> {
27 +
28 + private final boolean updated;
29 + private final V oldValue;
30 + private final V newValue;
31 +
32 + public UpdateResult(boolean updated, V oldValue, V newValue) {
33 + this.updated = updated;
34 + this.oldValue = oldValue;
35 + this.newValue = newValue;
36 + }
37 +
38 + public boolean updated() {
39 + return updated;
40 + }
41 +
42 + public V oldValue() {
43 + return oldValue;
44 + }
45 +
46 + public V newValue() {
47 + return newValue;
48 + }
49 +}
...\ No newline at end of file ...\ No newline at end of file