Aaron Kruglikov
Committed by Gerrit Code Review

Adding atomic countetr compare and set method

Change-Id: I5cf459e9e09ab1a84ced8160ef61d6a52ea4bea4
...@@ -66,4 +66,13 @@ public interface AsyncAtomicCounter { ...@@ -66,4 +66,13 @@ public interface AsyncAtomicCounter {
66 * @return future void 66 * @return future void
67 */ 67 */
68 CompletableFuture<Void> set(long value); 68 CompletableFuture<Void> set(long value);
69 +
70 + /**
71 + * Atomically sets the given counter to the updated value if the current value is the expected value, otherwise
72 + * no change occurs.
73 + * @param expectedValue the expected current value of the counter
74 + * @param updateValue the new value to be set
75 + * @return true if the update occurred and the expected value was equal to the current value, false otherwise
76 + */
77 + CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue);
69 } 78 }
......
...@@ -57,6 +57,14 @@ public interface AtomicCounter { ...@@ -57,6 +57,14 @@ public interface AtomicCounter {
57 */ 57 */
58 void set(long value); 58 void set(long value);
59 59
60 + /**
61 + * Atomically sets the given counter to the updated value if the current value is the expected value, otherwise
62 + * no change occurs.
63 + * @param expectedValue the expected current value of the counter
64 + * @param updateValue the new value to be set
65 + * @return true if the update occurred and the expected value was equal to the current value, false otherwise
66 + */
67 + boolean compareAndSet(long expectedValue, long updateValue);
60 68
61 /** 69 /**
62 * Returns the current value of the counter without modifying it. 70 * Returns the current value of the counter without modifying it.
......
...@@ -53,6 +53,11 @@ public final class TestAtomicCounter implements AtomicCounter { ...@@ -53,6 +53,11 @@ public final class TestAtomicCounter implements AtomicCounter {
53 } 53 }
54 54
55 @Override 55 @Override
56 + public boolean compareAndSet(long expectedValue, long updateValue) {
57 + return value.compareAndSet(expectedValue, updateValue);
58 + }
59 +
60 + @Override
56 public long get() { 61 public long get() {
57 return value.get(); 62 return value.get();
58 } 63 }
......
...@@ -16,14 +16,14 @@ ...@@ -16,14 +16,14 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 +import org.onosproject.store.service.Transaction;
20 +import org.onosproject.store.service.Versioned;
21 +
19 import java.util.Collection; 22 import java.util.Collection;
20 import java.util.Map; 23 import java.util.Map;
21 import java.util.Set; 24 import java.util.Set;
22 import java.util.concurrent.CompletableFuture; 25 import java.util.concurrent.CompletableFuture;
23 26
24 -import org.onosproject.store.service.Transaction;
25 -import org.onosproject.store.service.Versioned;
26 -
27 /** 27 /**
28 * Database proxy. 28 * Database proxy.
29 */ 29 */
...@@ -160,6 +160,16 @@ public interface DatabaseProxy<K, V> { ...@@ -160,6 +160,16 @@ public interface DatabaseProxy<K, V> {
160 CompletableFuture<Void> counterSet(String counterName, long value); 160 CompletableFuture<Void> counterSet(String counterName, long value);
161 161
162 /** 162 /**
163 + * Atomically sets the given counter to the specified update value if and only if the current value is equal to the
164 + * expected value.
165 + * @param counterName counter name
166 + * @param expectedValue value to use for equivalence check
167 + * @param update value to set if expected value is current value
168 + * @return true if an update occurred, false otherwise
169 + */
170 + CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update);
171 +
172 + /**
163 * Returns the current value of the specified atomic counter. 173 * Returns the current value of the specified atomic counter.
164 * 174 *
165 * @param counterName counter name 175 * @param counterName counter name
......
...@@ -16,18 +16,17 @@ ...@@ -16,18 +16,17 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 -import java.util.Collection;
20 -import java.util.Map;
21 -import java.util.Map.Entry;
22 -import java.util.Set;
23 -
24 -import org.onosproject.store.service.Transaction;
25 -import org.onosproject.store.service.Versioned;
26 -
27 import net.kuujo.copycat.state.Command; 19 import net.kuujo.copycat.state.Command;
28 import net.kuujo.copycat.state.Initializer; 20 import net.kuujo.copycat.state.Initializer;
29 import net.kuujo.copycat.state.Query; 21 import net.kuujo.copycat.state.Query;
30 import net.kuujo.copycat.state.StateContext; 22 import net.kuujo.copycat.state.StateContext;
23 +import org.onosproject.store.service.Transaction;
24 +import org.onosproject.store.service.Versioned;
25 +
26 +import java.util.Collection;
27 +import java.util.Map;
28 +import java.util.Map.Entry;
29 +import java.util.Set;
31 30
32 /** 31 /**
33 * Database state. 32 * Database state.
...@@ -83,6 +82,9 @@ public interface DatabaseState<K, V> { ...@@ -83,6 +82,9 @@ public interface DatabaseState<K, V> {
83 Long counterAddAndGet(String counterName, long delta); 82 Long counterAddAndGet(String counterName, long delta);
84 83
85 @Command 84 @Command
85 + Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue);
86 +
87 + @Command
86 Long counterGetAndAdd(String counterName, long delta); 88 Long counterGetAndAdd(String counterName, long delta);
87 89
88 @Query 90 @Query
......
...@@ -40,6 +40,7 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -40,6 +40,7 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
40 private static final String ADD_AND_GET = "addAndGet"; 40 private static final String ADD_AND_GET = "addAndGet";
41 private static final String GET = "get"; 41 private static final String GET = "get";
42 private static final String SET = "set"; 42 private static final String SET = "set";
43 + private static final String COMPARE_AND_SET = "compareAndSet";
43 44
44 public DefaultAsyncAtomicCounter(String name, 45 public DefaultAsyncAtomicCounter(String name,
45 Database database, 46 Database database,
...@@ -90,4 +91,11 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter { ...@@ -90,4 +91,11 @@ public class DefaultAsyncAtomicCounter implements AsyncAtomicCounter {
90 return database.counterSet(name, value) 91 return database.counterSet(name, value)
91 .whenComplete((r, e) -> timer.stop(e)); 92 .whenComplete((r, e) -> timer.stop(e));
92 } 93 }
94 +
95 + @Override
96 + public CompletableFuture<Boolean> compareAndSet(long expectedValue, long updateValue) {
97 + final MeteringAgent.Context timer = monitor.startTimer(COMPARE_AND_SET);
98 + return database.counterCompareAndSet(name, expectedValue, updateValue)
99 + .whenComplete((r, e) -> timer.stop(e));
100 + }
93 } 101 }
......
...@@ -68,6 +68,11 @@ public class DefaultAtomicCounter implements AtomicCounter { ...@@ -68,6 +68,11 @@ public class DefaultAtomicCounter implements AtomicCounter {
68 } 68 }
69 69
70 @Override 70 @Override
71 + public boolean compareAndSet(long expectedValue, long updateValue) {
72 + return complete(asyncCounter.compareAndSet(expectedValue, updateValue));
73 + }
74 +
75 + @Override
71 public long get() { 76 public long get() {
72 return complete(asyncCounter.get()); 77 return complete(asyncCounter.get());
73 } 78 }
......
...@@ -16,12 +16,15 @@ ...@@ -16,12 +16,15 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 -import net.kuujo.copycat.state.StateMachine; 19 +import com.google.common.collect.Sets;
20 import net.kuujo.copycat.resource.internal.AbstractResource; 20 import net.kuujo.copycat.resource.internal.AbstractResource;
21 import net.kuujo.copycat.resource.internal.ResourceManager; 21 import net.kuujo.copycat.resource.internal.ResourceManager;
22 +import net.kuujo.copycat.state.StateMachine;
22 import net.kuujo.copycat.state.internal.DefaultStateMachine; 23 import net.kuujo.copycat.state.internal.DefaultStateMachine;
23 import net.kuujo.copycat.util.concurrent.Futures; 24 import net.kuujo.copycat.util.concurrent.Futures;
24 import net.kuujo.copycat.util.function.TriConsumer; 25 import net.kuujo.copycat.util.function.TriConsumer;
26 +import org.onosproject.store.service.Transaction;
27 +import org.onosproject.store.service.Versioned;
25 28
26 import java.util.Collection; 29 import java.util.Collection;
27 import java.util.Map; 30 import java.util.Map;
...@@ -30,11 +33,6 @@ import java.util.concurrent.CompletableFuture; ...@@ -30,11 +33,6 @@ import java.util.concurrent.CompletableFuture;
30 import java.util.function.Consumer; 33 import java.util.function.Consumer;
31 import java.util.function.Supplier; 34 import java.util.function.Supplier;
32 35
33 -import org.onosproject.store.service.Transaction;
34 -import org.onosproject.store.service.Versioned;
35 -
36 -import com.google.common.collect.Sets;
37 -
38 /** 36 /**
39 * Default database. 37 * Default database.
40 */ 38 */
...@@ -48,9 +46,9 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -48,9 +46,9 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
48 public DefaultDatabase(ResourceManager context) { 46 public DefaultDatabase(ResourceManager context) {
49 super(context); 47 super(context);
50 this.stateMachine = new DefaultStateMachine(context, 48 this.stateMachine = new DefaultStateMachine(context,
51 - DatabaseState.class, 49 + DatabaseState.class,
52 - DefaultDatabaseState.class, 50 + DefaultDatabaseState.class,
53 - DefaultDatabase.class.getClassLoader()); 51 + DefaultDatabase.class.getClassLoader());
54 this.stateMachine.addStartupTask(() -> { 52 this.stateMachine.addStartupTask(() -> {
55 stateMachine.registerWatcher(watcher); 53 stateMachine.registerWatcher(watcher);
56 return CompletableFuture.completedFuture(null); 54 return CompletableFuture.completedFuture(null);
...@@ -158,6 +156,11 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -158,6 +156,11 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
158 } 156 }
159 157
160 @Override 158 @Override
159 + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long update) {
160 + return checkOpen(() -> proxy.counterCompareAndSet(counterName, expectedValue, update));
161 + }
162 +
163 + @Override
161 public CompletableFuture<Long> queueSize(String queueName) { 164 public CompletableFuture<Long> queueSize(String queueName) {
162 return checkOpen(() -> proxy.queueSize(queueName)); 165 return checkOpen(() -> proxy.queueSize(queueName));
163 } 166 }
......
...@@ -16,27 +16,26 @@ ...@@ -16,27 +16,26 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 +import com.google.common.base.Objects;
20 +import com.google.common.collect.ImmutableList;
21 +import com.google.common.collect.ImmutableSet;
22 +import com.google.common.collect.Lists;
23 +import com.google.common.collect.Maps;
24 +import net.kuujo.copycat.state.Initializer;
25 +import net.kuujo.copycat.state.StateContext;
26 +import org.onosproject.store.service.DatabaseUpdate;
27 +import org.onosproject.store.service.Transaction;
28 +import org.onosproject.store.service.Versioned;
29 +
19 import java.util.Arrays; 30 import java.util.Arrays;
20 import java.util.Collection; 31 import java.util.Collection;
21 import java.util.LinkedList; 32 import java.util.LinkedList;
22 import java.util.Map; 33 import java.util.Map;
23 import java.util.Map.Entry; 34 import java.util.Map.Entry;
24 import java.util.Queue; 35 import java.util.Queue;
36 +import java.util.Set;
25 import java.util.concurrent.atomic.AtomicLong; 37 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.stream.Collectors; 38 import java.util.stream.Collectors;
27 -import java.util.Set;
28 -
29 -import org.onosproject.store.service.DatabaseUpdate;
30 -import org.onosproject.store.service.Transaction;
31 -import org.onosproject.store.service.Versioned;
32 -import com.google.common.base.Objects;
33 -import com.google.common.collect.ImmutableList;
34 -import com.google.common.collect.ImmutableSet;
35 -import com.google.common.collect.Lists;
36 -import com.google.common.collect.Maps;
37 -
38 -import net.kuujo.copycat.state.Initializer;
39 -import net.kuujo.copycat.state.StateContext;
40 39
41 /** 40 /**
42 * Default database state. 41 * Default database state.
...@@ -195,6 +194,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> { ...@@ -195,6 +194,11 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
195 } 194 }
196 195
197 @Override 196 @Override
197 + public Boolean counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
198 + return getCounter(counterName).compareAndSet(expectedValue, updateValue);
199 + }
200 +
201 + @Override
198 public Long counterGet(String counterName) { 202 public Long counterGet(String counterName) {
199 return getCounter(counterName).get(); 203 return getCounter(counterName).get();
200 } 204 }
......
...@@ -16,6 +16,17 @@ ...@@ -16,6 +16,17 @@
16 16
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 +import com.google.common.collect.ImmutableList;
20 +import com.google.common.collect.Lists;
21 +import com.google.common.collect.Maps;
22 +import com.google.common.collect.Sets;
23 +import net.kuujo.copycat.Task;
24 +import net.kuujo.copycat.cluster.Cluster;
25 +import net.kuujo.copycat.resource.ResourceState;
26 +import org.onosproject.store.service.DatabaseUpdate;
27 +import org.onosproject.store.service.Transaction;
28 +import org.onosproject.store.service.Versioned;
29 +
19 import java.util.Collection; 30 import java.util.Collection;
20 import java.util.List; 31 import java.util.List;
21 import java.util.Map; 32 import java.util.Map;
...@@ -28,18 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -28,18 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.function.Consumer; 39 import java.util.function.Consumer;
29 import java.util.stream.Collectors; 40 import java.util.stream.Collectors;
30 41
31 -import org.onosproject.store.service.DatabaseUpdate;
32 -import org.onosproject.store.service.Transaction;
33 -import org.onosproject.store.service.Versioned;
34 -
35 -import com.google.common.collect.ImmutableList;
36 -import com.google.common.collect.Lists;
37 -import com.google.common.collect.Maps;
38 -import com.google.common.collect.Sets;
39 -
40 -import net.kuujo.copycat.Task;
41 -import net.kuujo.copycat.cluster.Cluster;
42 -import net.kuujo.copycat.resource.ResourceState;
43 import static com.google.common.base.Preconditions.checkState; 42 import static com.google.common.base.Preconditions.checkState;
44 43
45 /** 44 /**
...@@ -227,6 +226,14 @@ public class PartitionedDatabase implements Database { ...@@ -227,6 +226,14 @@ public class PartitionedDatabase implements Database {
227 } 226 }
228 227
229 @Override 228 @Override
229 + public CompletableFuture<Boolean> counterCompareAndSet(String counterName, long expectedValue, long updateValue) {
230 + checkState(isOpen.get(), DB_NOT_OPEN);
231 + return partitioner.getPartition(counterName, counterName).
232 + counterCompareAndSet(counterName, expectedValue, updateValue);
233 +
234 + }
235 +
236 + @Override
230 public CompletableFuture<Long> queueSize(String queueName) { 237 public CompletableFuture<Long> queueSize(String queueName) {
231 checkState(isOpen.get(), DB_NOT_OPEN); 238 checkState(isOpen.get(), DB_NOT_OPEN);
232 return partitioner.getPartition(queueName, queueName).queueSize(queueName); 239 return partitioner.getPartition(queueName, queueName).queueSize(queueName);
......