Madan Jampani

AtomicValue updates

    - events now have old and new values
    - simplify implementation by using map.replace for compareAndSet
    - Removed option for disabling it from builder. Will make it a system wide option in a subsequent update

Change-Id: I7c1424f8671c0e54688172c273b9262f305b0920
...@@ -60,14 +60,6 @@ public interface AtomicValueBuilder<V> { ...@@ -60,14 +60,6 @@ public interface AtomicValueBuilder<V> {
60 AtomicValueBuilder<V> withPartitionsDisabled(); 60 AtomicValueBuilder<V> withPartitionsDisabled();
61 61
62 /** 62 /**
63 - * Instantiates Metering service to gather usage and performance metrics.
64 - * By default, usage data will be stored.
65 - *
66 - * @return this AtomicValueBuilder for method chaining
67 - */
68 - AtomicValueBuilder<V> withMeteringDisabled();
69 -
70 - /**
71 * Builds a AsyncAtomicValue based on the configuration options 63 * Builds a AsyncAtomicValue based on the configuration options
72 * supplied to this builder. 64 * supplied to this builder.
73 * 65 *
......
...@@ -38,20 +38,20 @@ public final class AtomicValueEvent<V> { ...@@ -38,20 +38,20 @@ public final class AtomicValueEvent<V> {
38 } 38 }
39 39
40 private final String name; 40 private final String name;
41 - private final Type type; 41 + private final V newValue;
42 - private final V value; 42 + private final V oldValue;
43 43
44 /** 44 /**
45 * Creates a new event object. 45 * Creates a new event object.
46 * 46 *
47 * @param name AtomicValue name 47 * @param name AtomicValue name
48 - * @param type the type of the event 48 + * @param newValue the new value
49 - * @param value the new value 49 + * @param oldValue the old value
50 */ 50 */
51 - public AtomicValueEvent(String name, Type type, V value) { 51 + public AtomicValueEvent(String name, V newValue, V oldValue) {
52 this.name = name; 52 this.name = name;
53 - this.type = type; 53 + this.newValue = newValue;
54 - this.value = value; 54 + this.oldValue = oldValue;
55 } 55 }
56 56
57 /** 57 /**
...@@ -69,16 +69,25 @@ public final class AtomicValueEvent<V> { ...@@ -69,16 +69,25 @@ public final class AtomicValueEvent<V> {
69 * @return the type of the event 69 * @return the type of the event
70 */ 70 */
71 public Type type() { 71 public Type type() {
72 - return type; 72 + return AtomicValueEvent.Type.UPDATE;
73 } 73 }
74 74
75 /** 75 /**
76 - * Returns the new updated value. 76 + * Returns the newly set value.
77 * 77 *
78 - * @return the value 78 + * @return the new value
79 */ 79 */
80 - public V value() { 80 + public V newValue() {
81 - return value; 81 + return newValue;
82 + }
83 +
84 + /**
85 + * Returns the old replaced value.
86 + *
87 + * @return the old value
88 + */
89 + public V oldValue() {
90 + return oldValue;
82 } 91 }
83 92
84 @Override 93 @Override
...@@ -89,21 +98,21 @@ public final class AtomicValueEvent<V> { ...@@ -89,21 +98,21 @@ public final class AtomicValueEvent<V> {
89 98
90 AtomicValueEvent that = (AtomicValueEvent) o; 99 AtomicValueEvent that = (AtomicValueEvent) o;
91 return Objects.equals(this.name, that.name) && 100 return Objects.equals(this.name, that.name) &&
92 - Objects.equals(this.type, that.type) && 101 + Objects.equals(this.newValue, that.newValue) &&
93 - Objects.equals(this.value, that.value); 102 + Objects.equals(this.oldValue, that.oldValue);
94 } 103 }
95 104
96 @Override 105 @Override
97 public int hashCode() { 106 public int hashCode() {
98 - return Objects.hash(name, type, value); 107 + return Objects.hash(name, newValue, oldValue);
99 } 108 }
100 109
101 @Override 110 @Override
102 public String toString() { 111 public String toString() {
103 return MoreObjects.toStringHelper(getClass()) 112 return MoreObjects.toStringHelper(getClass())
104 .add("name", name) 113 .add("name", name)
105 - .add("type", type) 114 + .add("newValue", newValue)
106 - .add("value", value) 115 + .add("oldValue", oldValue)
107 .toString(); 116 .toString();
108 } 117 }
109 } 118 }
......
...@@ -30,13 +30,13 @@ import static org.onosproject.store.service.AtomicValueEvent.Type.UPDATE; ...@@ -30,13 +30,13 @@ import static org.onosproject.store.service.AtomicValueEvent.Type.UPDATE;
30 public class AtomicValueEventTest { 30 public class AtomicValueEventTest {
31 31
32 AtomicValueEvent<String> event1 = 32 AtomicValueEvent<String> event1 =
33 - new AtomicValueEvent<>("map1", UPDATE, "e1"); 33 + new AtomicValueEvent<>("map1", "e1", "e0");
34 AtomicValueEvent<String> event2 = 34 AtomicValueEvent<String> event2 =
35 - new AtomicValueEvent<>("map1", UPDATE, "e2"); 35 + new AtomicValueEvent<>("map1", "e2", "e1");
36 AtomicValueEvent<String> sameAsEvent2 = 36 AtomicValueEvent<String> sameAsEvent2 =
37 - new AtomicValueEvent<>("map1", UPDATE, "e2"); 37 + new AtomicValueEvent<>("map1", "e2", "e1");
38 AtomicValueEvent<String> event3 = 38 AtomicValueEvent<String> event3 =
39 - new AtomicValueEvent<>("map2", UPDATE, "e2"); 39 + new AtomicValueEvent<>("map2", "e2", "e1");
40 40
41 /** 41 /**
42 * Checks that the SetEvent class is immutable. 42 * Checks that the SetEvent class is immutable.
...@@ -64,7 +64,8 @@ public class AtomicValueEventTest { ...@@ -64,7 +64,8 @@ public class AtomicValueEventTest {
64 @Test 64 @Test
65 public void testConstruction() { 65 public void testConstruction() {
66 assertThat(event1.type(), is(UPDATE)); 66 assertThat(event1.type(), is(UPDATE));
67 - assertThat(event1.value(), is("e1")); 67 + assertThat(event1.newValue(), is("e1"));
68 + assertThat(event1.oldValue(), is("e0"));
68 assertThat(event1.name(), is("map1")); 69 assertThat(event1.name(), is("map1"));
69 } 70 }
70 71
......
...@@ -13,31 +13,34 @@ ...@@ -13,31 +13,34 @@
13 * See the License for the specific language governing permissions and 13 * See the License for the specific language governing permissions and
14 * limitations under the License. 14 * limitations under the License.
15 */ 15 */
16 +
16 package org.onosproject.store.primitives.impl; 17 package org.onosproject.store.primitives.impl;
17 18
19 +import static com.google.common.base.Preconditions.checkNotNull;
20 +
21 +import java.util.Map;
22 +import java.util.concurrent.CompletableFuture;
23 +
18 import org.onosproject.store.service.AsyncAtomicValue; 24 import org.onosproject.store.service.AsyncAtomicValue;
19 import org.onosproject.store.service.AsyncConsistentMap; 25 import org.onosproject.store.service.AsyncConsistentMap;
20 import org.onosproject.store.service.AtomicValueEvent; 26 import org.onosproject.store.service.AtomicValueEvent;
21 import org.onosproject.store.service.AtomicValueEventListener; 27 import org.onosproject.store.service.AtomicValueEventListener;
22 import org.onosproject.store.service.MapEvent; 28 import org.onosproject.store.service.MapEvent;
23 import org.onosproject.store.service.MapEventListener; 29 import org.onosproject.store.service.MapEventListener;
30 +import org.onosproject.store.service.Serializer;
24 import org.onosproject.store.service.Versioned; 31 import org.onosproject.store.service.Versioned;
25 32
26 -import java.util.Set; 33 +import com.google.common.base.Throwables;
27 -import java.util.concurrent.CompletableFuture; 34 +import com.google.common.collect.Maps;
28 -import java.util.concurrent.CopyOnWriteArraySet; 35 +
29 36
30 -/**
31 - * Default implementation of {@link AsyncAtomicValue}.
32 - *
33 - * @param <V> value type
34 - */
35 public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> { 37 public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
36 38
37 - private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
38 - private final AsyncConsistentMap<String, V> valueMap;
39 private final String name; 39 private final String name;
40 - private final MapEventListener<String, V> mapEventListener = new InternalMapEventListener(); 40 + private final Serializer serializer;
41 + private final AsyncConsistentMap<String, byte[]> backingMap;
42 + private final Map<AtomicValueEventListener<V>, MapEventListener<String, byte[]>> listeners =
43 + Maps.newIdentityHashMap();
41 private final MeteringAgent monitor; 44 private final MeteringAgent monitor;
42 45
43 private static final String COMPONENT_NAME = "atomicValue"; 46 private static final String COMPONENT_NAME = "atomicValue";
...@@ -45,13 +48,15 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> { ...@@ -45,13 +48,15 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
45 private static final String GET_AND_SET = "getAndSet"; 48 private static final String GET_AND_SET = "getAndSet";
46 private static final String SET = "set"; 49 private static final String SET = "set";
47 private static final String COMPARE_AND_SET = "compareAndSet"; 50 private static final String COMPARE_AND_SET = "compareAndSet";
51 + private static final String ADD_LISTENER = "addListener";
52 + private static final String REMOVE_LISTENER = "removeListener";
53 + private static final String NOTIFY_LISTENER = "notifyListener";
48 54
49 - public DefaultAsyncAtomicValue(AsyncConsistentMap<String, V> valueMap, 55 + public DefaultAsyncAtomicValue(String name, Serializer serializer, AsyncConsistentMap<String, byte[]> backingMap) {
50 - String name, 56 + this.name = checkNotNull(name, "name must not be null");
51 - boolean meteringEnabled) { 57 + this.serializer = checkNotNull(serializer, "serializer must not be null");
52 - this.valueMap = valueMap; 58 + this.backingMap = checkNotNull(backingMap, "backingMap must not be null");
53 - this.name = name; 59 + this.monitor = new MeteringAgent(COMPONENT_NAME, name, true);
54 - this.monitor = new MeteringAgent(COMPONENT_NAME, name, meteringEnabled);
55 } 60 }
56 61
57 @Override 62 @Override
...@@ -62,76 +67,95 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> { ...@@ -62,76 +67,95 @@ public class DefaultAsyncAtomicValue<V> implements AsyncAtomicValue<V> {
62 @Override 67 @Override
63 public CompletableFuture<Boolean> compareAndSet(V expect, V update) { 68 public CompletableFuture<Boolean> compareAndSet(V expect, V update) {
64 final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET); 69 final MeteringAgent.Context newTimer = monitor.startTimer(COMPARE_AND_SET);
65 - CompletableFuture<Boolean> response; 70 + return backingMap.replace(name, serializer.encode(expect), serializer.encode(update))
66 - if (expect == null) { 71 + .whenComplete((r, e) -> newTimer.stop(e));
67 - if (update == null) {
68 - response = CompletableFuture.completedFuture(true);
69 - }
70 - response = valueMap.putIfAbsent(name, update).thenApply(v -> v == null);
71 - } else {
72 - response = update == null
73 - ? valueMap.remove(name, expect)
74 - : valueMap.replace(name, expect, update);
75 - }
76 - return response.whenComplete((r, e) -> newTimer.stop(null));
77 } 72 }
78 73
79 @Override 74 @Override
80 public CompletableFuture<V> get() { 75 public CompletableFuture<V> get() {
81 final MeteringAgent.Context newTimer = monitor.startTimer(GET); 76 final MeteringAgent.Context newTimer = monitor.startTimer(GET);
82 - return valueMap.get(name) 77 + return backingMap.get(name)
83 - .thenApply(Versioned::valueOrNull) 78 + .thenApply(Versioned::valueOrNull)
84 - .whenComplete((r, e) -> newTimer.stop(null)); 79 + .thenApply(v -> v == null ? null : serializer.<V>decode(v))
80 + .whenComplete((r, e) -> newTimer.stop(e));
85 } 81 }
86 82
87 @Override 83 @Override
88 public CompletableFuture<V> getAndSet(V value) { 84 public CompletableFuture<V> getAndSet(V value) {
89 final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET); 85 final MeteringAgent.Context newTimer = monitor.startTimer(GET_AND_SET);
90 - CompletableFuture<Versioned<V>> previousValue = value == null ? 86 + if (value == null) {
91 - valueMap.remove(name) : valueMap.put(name, value); 87 + return backingMap.remove(name)
92 - return previousValue.thenApply(Versioned::valueOrNull) 88 + .thenApply(Versioned::valueOrNull)
93 - .whenComplete((r, e) -> newTimer.stop(null)); 89 + .thenApply(v -> v == null ? null : serializer.<V>decode(v))
90 + .whenComplete((r, e) -> newTimer.stop(e));
91 + }
92 + return backingMap.put(name, serializer.encode(value))
93 + .thenApply(Versioned::valueOrNull)
94 + .thenApply(v -> v == null ? null : serializer.<V>decode(v))
95 + .whenComplete((r, e) -> newTimer.stop(e));
94 } 96 }
95 97
96 @Override 98 @Override
97 public CompletableFuture<Void> set(V value) { 99 public CompletableFuture<Void> set(V value) {
98 final MeteringAgent.Context newTimer = monitor.startTimer(SET); 100 final MeteringAgent.Context newTimer = monitor.startTimer(SET);
99 - CompletableFuture<Void> previousValue = value == null ? 101 + if (value == null) {
100 - valueMap.remove(name).thenApply(v -> null) : valueMap.put(name, value).thenApply(v -> null); 102 + return backingMap.remove(name)
101 - return previousValue.whenComplete((r, e) -> newTimer.stop(null)); 103 + .whenComplete((r, e) -> newTimer.stop(e))
104 + .thenApply(v -> null);
105 +
106 + }
107 + return backingMap.put(name, serializer.encode(value))
108 + .whenComplete((r, e) -> newTimer.stop(e))
109 + .thenApply(v -> null);
102 } 110 }
103 111
104 @Override 112 @Override
105 public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) { 113 public CompletableFuture<Void> addListener(AtomicValueEventListener<V> listener) {
106 - synchronized (listeners) { 114 + checkNotNull(listener, "listener must not be null");
107 - if (listeners.add(listener)) { 115 + final MeteringAgent.Context newTimer = monitor.startTimer(ADD_LISTENER);
108 - if (listeners.size() == 1) { 116 + MapEventListener<String, byte[]> mapListener =
109 - return valueMap.addListener(mapEventListener); 117 + listeners.computeIfAbsent(listener, key -> new InternalMapValueEventListener(listener));
110 - } 118 + return backingMap.addListener(mapListener).whenComplete((r, e) -> newTimer.stop(e));
111 - }
112 - }
113 - return CompletableFuture.completedFuture(null);
114 } 119 }
115 120
116 @Override 121 @Override
117 public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) { 122 public CompletableFuture<Void> removeListener(AtomicValueEventListener<V> listener) {
118 - synchronized (listeners) { 123 + checkNotNull(listener, "listener must not be null");
119 - if (listeners.remove(listener)) { 124 + final MeteringAgent.Context newTimer = monitor.startTimer(REMOVE_LISTENER);
120 - if (listeners.size() == 0) { 125 + MapEventListener<String, byte[]> mapListener = listeners.remove(listener);
121 - return valueMap.removeListener(mapEventListener); 126 + if (mapListener != null) {
122 - } 127 + return backingMap.removeListener(mapListener)
123 - } 128 + .whenComplete((r, e) -> newTimer.stop(e));
129 + } else {
130 + newTimer.stop(null);
131 + return CompletableFuture.completedFuture(null);
124 } 132 }
125 - return CompletableFuture.completedFuture(null);
126 } 133 }
127 134
128 - private class InternalMapEventListener implements MapEventListener<String, V> { 135 + private class InternalMapValueEventListener implements MapEventListener<String, byte[]> {
136 +
137 + private final AtomicValueEventListener<V> listener;
138 +
139 + InternalMapValueEventListener(AtomicValueEventListener<V> listener) {
140 + this.listener = listener;
141 + }
129 142
130 @Override 143 @Override
131 - public void event(MapEvent<String, V> mapEvent) { 144 + public void event(MapEvent<String, byte[]> event) {
132 - V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : mapEvent.value().value(); 145 + if (event.key().equals(name)) {
133 - AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue); 146 + final MeteringAgent.Context newTimer = monitor.startTimer(NOTIFY_LISTENER);
134 - listeners.forEach(l -> l.event(atomicValueEvent)); 147 + byte[] rawNewValue = Versioned.valueOrNull(event.newValue());
148 + byte[] rawOldValue = Versioned.valueOrNull(event.oldValue());
149 + try {
150 + listener.event(new AtomicValueEvent<>(name,
151 + rawNewValue == null ? null : serializer.decode(rawNewValue),
152 + rawOldValue == null ? null : serializer.decode(rawOldValue)));
153 + newTimer.stop(null);
154 + } catch (Exception e) {
155 + newTimer.stop(e);
156 + Throwables.propagate(e);
157 + }
158 + }
135 } 159 }
136 } 160 }
137 -} 161 +}
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -30,11 +30,11 @@ import org.onosproject.store.service.Serializer; ...@@ -30,11 +30,11 @@ import org.onosproject.store.service.Serializer;
30 public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> { 30 public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
31 31
32 private String name; 32 private String name;
33 - private ConsistentMapBuilder<String, V> mapBuilder; 33 + private Serializer serializer;
34 - private boolean metering = true; 34 + private ConsistentMapBuilder<String, byte[]> mapBuilder;
35 35
36 public DefaultAtomicValueBuilder(DatabaseManager manager) { 36 public DefaultAtomicValueBuilder(DatabaseManager manager) {
37 - mapBuilder = manager.<String, V>consistentMapBuilder() 37 + mapBuilder = manager.<String, byte[]>consistentMapBuilder()
38 .withName("onos-atomic-values") 38 .withName("onos-atomic-values")
39 .withMeteringDisabled() 39 .withMeteringDisabled()
40 .withSerializer(Serializer.using(KryoNamespaces.BASIC)); 40 .withSerializer(Serializer.using(KryoNamespaces.BASIC));
...@@ -59,14 +59,8 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> { ...@@ -59,14 +59,8 @@ public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
59 } 59 }
60 60
61 @Override 61 @Override
62 - public AtomicValueBuilder<V> withMeteringDisabled() {
63 - metering = false;
64 - return this;
65 - }
66 -
67 - @Override
68 public AsyncAtomicValue<V> buildAsyncValue() { 62 public AsyncAtomicValue<V> buildAsyncValue() {
69 - return new DefaultAsyncAtomicValue<>(mapBuilder.buildAsyncMap(), name, metering); 63 + return new DefaultAsyncAtomicValue<>(name, serializer, mapBuilder.buildAsyncMap());
70 } 64 }
71 65
72 @Override 66 @Override
......