Madan Jampani
Committed by Gerrit Code Review

Notification support for Consistent datastructures (ConsitentMap and DistributedSet)

Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
Showing 21 changed files with 585 additions and 93 deletions
...@@ -288,4 +288,19 @@ public interface AsyncConsistentMap<K, V> { ...@@ -288,4 +288,19 @@ public interface AsyncConsistentMap<K, V> {
288 * @return optional updated value. Will be empty if update did not happen. 288 * @return optional updated value. Will be empty if update did not happen.
289 */ 289 */
290 CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue); 290 CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
291 +
292 + /**
293 + * Registers the specified listener to be notified whenever the map is updated.
294 + *
295 + * @param listener listener to notify about map events
296 + */
297 + void addListener(MapEventListener<K, V> listener);
298 +
299 + /**
300 + * Unregisters the specified listener such that it will no longer
301 + * receive map change notifications.
302 + *
303 + * @param listener listener to unregister
304 + */
305 + void removeListener(MapEventListener<K, V> listener);
291 } 306 }
......
...@@ -289,4 +289,19 @@ public interface ConsistentMap<K, V> { ...@@ -289,4 +289,19 @@ public interface ConsistentMap<K, V> {
289 * @return optional new value. Will be empty if replace did not happen 289 * @return optional new value. Will be empty if replace did not happen
290 */ 290 */
291 Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue); 291 Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
292 +
293 + /**
294 + * Registers the specified listener to be notified whenever the map is updated.
295 + *
296 + * @param listener listener to notify about map events
297 + */
298 + void addListener(MapEventListener<K, V> listener);
299 +
300 + /**
301 + * Unregisters the specified listener such that it will no longer
302 + * receive map change notifications.
303 + *
304 + * @param listener listener to unregister
305 + */
306 + void removeListener(MapEventListener<K, V> listener);
292 } 307 }
......
1 +package org.onosproject.store.service;
2 +
3 +import java.util.Set;
4 +
5 +/**
6 + * A distributed collection designed for holding unique elements.
7 + *
8 + * @param <E> set entry type
9 + */
10 +public interface DistributedSet<E> extends Set<E> {
11 +
12 + /**
13 + * Registers the specified listener to be notified whenever
14 + * the set is updated.
15 + *
16 + * @param listener listener to notify about set update events
17 + */
18 + void addListener(SetEventListener<E> listener);
19 +
20 + /**
21 + * Unregisters the specified listener.
22 + *
23 + * @param listener listener to unregister.
24 + */
25 + void removeListener(SetEventListener<E> listener);
26 +}
...@@ -15,14 +15,12 @@ ...@@ -15,14 +15,12 @@
15 */ 15 */
16 package org.onosproject.store.service; 16 package org.onosproject.store.service;
17 17
18 -import java.util.Set;
19 -
20 /** 18 /**
21 * Builder for distributed set. 19 * Builder for distributed set.
22 * 20 *
23 * @param <E> type set elements. 21 * @param <E> type set elements.
24 */ 22 */
25 -public interface SetBuilder<E> { 23 +public interface DistributedSetBuilder<E> {
26 24
27 /** 25 /**
28 * Sets the name of the set. 26 * Sets the name of the set.
...@@ -34,9 +32,9 @@ public interface SetBuilder<E> { ...@@ -34,9 +32,9 @@ public interface SetBuilder<E> {
34 * </p> 32 * </p>
35 * 33 *
36 * @param name name of the set 34 * @param name name of the set
37 - * @return this SetBuilder 35 + * @return this DistributedSetBuilder
38 */ 36 */
39 - SetBuilder<E> withName(String name); 37 + DistributedSetBuilder<E> withName(String name);
40 38
41 /** 39 /**
42 * Sets a serializer that can be used to serialize 40 * Sets a serializer that can be used to serialize
...@@ -48,18 +46,36 @@ public interface SetBuilder<E> { ...@@ -48,18 +46,36 @@ public interface SetBuilder<E> {
48 * </p> 46 * </p>
49 * 47 *
50 * @param serializer serializer 48 * @param serializer serializer
51 - * @return this SetBuilder 49 + * @return this DistributedSetBuilder
52 */ 50 */
53 - SetBuilder<E> withSerializer(Serializer serializer); 51 + DistributedSetBuilder<E> withSerializer(Serializer serializer);
54 52
55 /** 53 /**
56 * Disables set updates. 54 * Disables set updates.
57 * <p> 55 * <p>
58 * Attempt to update the built set will throw {@code UnsupportedOperationException}. 56 * Attempt to update the built set will throw {@code UnsupportedOperationException}.
59 * 57 *
60 - * @return this SetBuilder 58 + * @return this DistributedSetBuilder
59 + */
60 + DistributedSetBuilder<E> withUpdatesDisabled();
61 +
62 + /**
63 + * Disables distribution of set entries across multiple database partitions.
64 + * <p>
65 + * When partitioning is disabled, the returned set will have a single partition
66 + * that spans the entire cluster. Furthermore, the changes made to the set are
67 + * ephemeral and do not survive a full cluster restart.
68 + * </p>
69 + * <p>
70 + * Disabling partitions is more appropriate when the returned set is used for
71 + * simple coordination activities and not for long term data persistence.
72 + * </p>
73 + * <p>
74 + * Note: By default partitions are enabled and entries in the set are durable.
75 + * </p>
76 + * @return this DistributedSetBuilder
61 */ 77 */
62 - SetBuilder<E> withUpdatesDisabled(); 78 + DistributedSetBuilder<E> withPartitionsDisabled();
63 79
64 /** 80 /**
65 * Builds an set based on the configuration options 81 * Builds an set based on the configuration options
...@@ -68,5 +84,5 @@ public interface SetBuilder<E> { ...@@ -68,5 +84,5 @@ public interface SetBuilder<E> {
68 * @return new set 84 * @return new set
69 * @throws java.lang.RuntimeException if a mandatory parameter is missing 85 * @throws java.lang.RuntimeException if a mandatory parameter is missing
70 */ 86 */
71 - Set<E> build(); 87 + DistributedSet<E> build();
72 } 88 }
......
1 +package org.onosproject.store.service;
2 +
3 +import java.util.Objects;
4 +
5 +import com.google.common.base.MoreObjects;
6 +
7 +/**
8 + * Representation of a ConsistentMap update notification.
9 + *
10 + * @param <K> key type
11 + * @param <V> value type
12 + */
13 +public class MapEvent<K, V> {
14 +
15 + /**
16 + * MapEvent type.
17 + */
18 + public enum Type {
19 + /**
20 + * Entry inserted into the map.
21 + */
22 + INSERT,
23 +
24 + /**
25 + * Existing map entry updated.
26 + */
27 + UPDATE,
28 +
29 + /**
30 + * Entry removed from map.
31 + */
32 + REMOVE
33 + }
34 +
35 + private final String name;
36 + private final Type type;
37 + private final K key;
38 + private final Versioned<V> value;
39 +
40 + /**
41 + * Creates a new event object.
42 + *
43 + * @param name map name
44 + * @param type the type of the event
45 + * @param key the key the event concerns
46 + * @param value the value related to the key, or null for remove events
47 + */
48 + public MapEvent(String name, Type type, K key, Versioned<V> value) {
49 + this.name = name;
50 + this.type = type;
51 + this.key = key;
52 + this.value = value;
53 + }
54 +
55 + /**
56 + * Returns the map name.
57 + *
58 + * @return name of map
59 + */
60 + public String name() {
61 + return name;
62 + }
63 +
64 + /**
65 + * Returns the type of the event.
66 + *
67 + * @return the type of the event
68 + */
69 + public Type type() {
70 + return type;
71 + }
72 +
73 + /**
74 + * Returns the key this event concerns.
75 + *
76 + * @return the key
77 + */
78 + public K key() {
79 + return key;
80 + }
81 +
82 + /**
83 + * Returns the value associated with this event. If type is REMOVE,
84 + * this is the value that was removed. If type is INSERT/UPDATE, this is
85 + * the new value.
86 + *
87 + * @return the value
88 + */
89 + public Versioned<V> value() {
90 + return value;
91 + }
92 +
93 + @Override
94 + public boolean equals(Object o) {
95 + if (!(o instanceof MapEvent)) {
96 + return false;
97 + }
98 +
99 + MapEvent<K, V> that = (MapEvent) o;
100 + return Objects.equals(this.name, that.name) &&
101 + Objects.equals(this.type, that.type) &&
102 + Objects.equals(this.key, that.key) &&
103 + Objects.equals(this.value, that.value);
104 + }
105 +
106 + @Override
107 + public int hashCode() {
108 + return Objects.hash(type, key, value);
109 + }
110 +
111 + @Override
112 + public String toString() {
113 + return MoreObjects.toStringHelper(getClass())
114 + .add("name", name)
115 + .add("type", type)
116 + .add("key", key)
117 + .add("value", value)
118 + .toString();
119 + }
120 +}
1 +package org.onosproject.store.service;
2 +
3 +/**
4 + * Listener to be notified about updates to a ConsitentMap.
5 + */
6 +public interface MapEventListener<K, V> {
7 + /**
8 + * Reacts to the specified event.
9 + *
10 + * @param event the event
11 + */
12 + void event(MapEvent<K, V> event);
13 +}
...@@ -16,8 +16,13 @@ ...@@ -16,8 +16,13 @@
16 16
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 +import java.util.Arrays;
20 +import java.util.List;
21 +
19 import org.onlab.util.KryoNamespace; 22 import org.onlab.util.KryoNamespace;
20 23
24 +import com.google.common.collect.Lists;
25 +
21 /** 26 /**
22 * Interface for serialization for store artifacts. 27 * Interface for serialization for store artifacts.
23 */ 28 */
...@@ -45,16 +50,32 @@ public interface Serializer { ...@@ -45,16 +50,32 @@ public interface Serializer {
45 * @return Serializer instance 50 * @return Serializer instance
46 */ 51 */
47 static Serializer using(KryoNamespace kryo) { 52 static Serializer using(KryoNamespace kryo) {
53 + return using(Arrays.asList(kryo));
54 + }
55 +
56 + static Serializer using(List<KryoNamespace> namespaces, Class<?>... classes) {
57 + KryoNamespace.Builder builder = new KryoNamespace.Builder();
58 + namespaces.forEach(builder::register);
59 + Lists.newArrayList(classes).forEach(builder::register);
60 + builder.register(MapEvent.class, MapEvent.Type.class);
61 + KryoNamespace namespace = builder.build();
48 return new Serializer() { 62 return new Serializer() {
49 @Override 63 @Override
50 public <T> byte[] encode(T object) { 64 public <T> byte[] encode(T object) {
51 - return kryo.serialize(object); 65 + return namespace.serialize(object);
52 } 66 }
53 67
54 @Override 68 @Override
55 public <T> T decode(byte[] bytes) { 69 public <T> T decode(byte[] bytes) {
56 - return kryo.deserialize(bytes); 70 + return namespace.deserialize(bytes);
57 } 71 }
58 }; 72 };
59 } 73 }
74 +
75 + static Serializer forTypes(Class<?>... classes) {
76 + return using(KryoNamespace.newBuilder()
77 + .register(classes)
78 + .register(MapEvent.class, MapEvent.Type.class)
79 + .build());
80 + }
60 } 81 }
......
1 +package org.onosproject.store.service;
2 +
3 +import java.util.Objects;
4 +
5 +import com.google.common.base.MoreObjects;
6 +
7 +/**
8 + * Representation of a DistributedSet update notification.
9 + *
10 + * @param <E> element type
11 + */
12 +public class SetEvent<E> {
13 +
14 + /**
15 + * SetEvent type.
16 + */
17 + public enum Type {
18 + /**
19 + * Entry added to the set.
20 + */
21 + ADD,
22 +
23 + /**
24 + * Entry removed from the set.
25 + */
26 + REMOVE
27 + }
28 +
29 + private final String name;
30 + private final Type type;
31 + private final E entry;
32 +
33 + /**
34 + * Creates a new event object.
35 + *
36 + * @param name set name
37 + * @param type the type of the event
38 + * @param entry the entry the event concerns
39 + */
40 + public SetEvent(String name, Type type, E entry) {
41 + this.name = name;
42 + this.type = type;
43 + this.entry = entry;
44 + }
45 +
46 + /**
47 + * Returns the set name.
48 + *
49 + * @return name of set
50 + */
51 + public String name() {
52 + return name;
53 + }
54 +
55 + /**
56 + * Returns the type of the event.
57 + *
58 + * @return the type of the event
59 + */
60 + public Type type() {
61 + return type;
62 + }
63 +
64 + /**
65 + * Returns the entry this event concerns.
66 + *
67 + * @return the entry
68 + */
69 + public E entry() {
70 + return entry;
71 + }
72 +
73 + @Override
74 + public boolean equals(Object o) {
75 + if (!(o instanceof SetEvent)) {
76 + return false;
77 + }
78 +
79 + SetEvent<E> that = (SetEvent) o;
80 + return Objects.equals(this.name, that.name) &&
81 + Objects.equals(this.type, that.type) &&
82 + Objects.equals(this.entry, that.entry);
83 + }
84 +
85 + @Override
86 + public int hashCode() {
87 + return Objects.hash(name, type, entry);
88 + }
89 +
90 + @Override
91 + public String toString() {
92 + return MoreObjects.toStringHelper(getClass())
93 + .add("name", name)
94 + .add("type", type)
95 + .add("entry", entry)
96 + .toString();
97 + }
98 +}
1 +package org.onosproject.store.service;
2 +
3 +/**
4 + * Listener to be notified about updates to a DistributedSet.
5 + */
6 +public interface SetEventListener<E> {
7 + /**
8 + * Reacts to the specified event.
9 + *
10 + * @param event the event
11 + */
12 + void event(SetEvent<E> event);
13 +}
...@@ -53,7 +53,7 @@ public interface StorageService { ...@@ -53,7 +53,7 @@ public interface StorageService {
53 * @param <E> set element type 53 * @param <E> set element type
54 * @return builder for an distributed set 54 * @return builder for an distributed set
55 */ 55 */
56 - <E> SetBuilder<E> setBuilder(); 56 + <E> DistributedSetBuilder<E> setBuilder();
57 57
58 /** 58 /**
59 * Creates a new AtomicCounterBuilder. 59 * Creates a new AtomicCounterBuilder.
......
...@@ -42,19 +42,24 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -42,19 +42,24 @@ import org.apache.felix.scr.annotations.Deactivate;
42 import org.apache.felix.scr.annotations.Reference; 42 import org.apache.felix.scr.annotations.Reference;
43 import org.apache.felix.scr.annotations.ReferenceCardinality; 43 import org.apache.felix.scr.annotations.ReferenceCardinality;
44 import org.apache.felix.scr.annotations.Service; 44 import org.apache.felix.scr.annotations.Service;
45 +
46 +import static org.onlab.util.Tools.groupedThreads;
47 +
45 import org.onosproject.cluster.ClusterService; 48 import org.onosproject.cluster.ClusterService;
46 import org.onosproject.core.IdGenerator; 49 import org.onosproject.core.IdGenerator;
47 import org.onosproject.store.cluster.impl.ClusterDefinitionManager; 50 import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
48 import org.onosproject.store.cluster.impl.NodeInfo; 51 import org.onosproject.store.cluster.impl.NodeInfo;
49 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 52 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
53 +import org.onosproject.store.cluster.messaging.MessageSubject;
50 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl; 54 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
51 import org.onosproject.store.service.AtomicCounterBuilder; 55 import org.onosproject.store.service.AtomicCounterBuilder;
52 import org.onosproject.store.service.ConsistentMapBuilder; 56 import org.onosproject.store.service.ConsistentMapBuilder;
53 import org.onosproject.store.service.ConsistentMapException; 57 import org.onosproject.store.service.ConsistentMapException;
54 import org.onosproject.store.service.EventuallyConsistentMapBuilder; 58 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
59 +import org.onosproject.store.service.MapEvent;
55 import org.onosproject.store.service.MapInfo; 60 import org.onosproject.store.service.MapInfo;
56 import org.onosproject.store.service.PartitionInfo; 61 import org.onosproject.store.service.PartitionInfo;
57 -import org.onosproject.store.service.SetBuilder; 62 +import org.onosproject.store.service.DistributedSetBuilder;
58 import org.onosproject.store.service.StorageAdminService; 63 import org.onosproject.store.service.StorageAdminService;
59 import org.onosproject.store.service.StorageService; 64 import org.onosproject.store.service.StorageService;
60 import org.onosproject.store.service.Transaction; 65 import org.onosproject.store.service.Transaction;
...@@ -69,6 +74,7 @@ import java.util.Map; ...@@ -69,6 +74,7 @@ import java.util.Map;
69 import java.util.Set; 74 import java.util.Set;
70 import java.util.concurrent.CompletableFuture; 75 import java.util.concurrent.CompletableFuture;
71 import java.util.concurrent.ExecutionException; 76 import java.util.concurrent.ExecutionException;
77 +import java.util.concurrent.ExecutorService;
72 import java.util.concurrent.Executors; 78 import java.util.concurrent.Executors;
73 import java.util.concurrent.TimeUnit; 79 import java.util.concurrent.TimeUnit;
74 import java.util.concurrent.TimeoutException; 80 import java.util.concurrent.TimeoutException;
...@@ -93,12 +99,16 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -93,12 +99,16 @@ public class DatabaseManager implements StorageService, StorageAdminService {
93 private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000; 99 private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
94 100
95 private ClusterCoordinator coordinator; 101 private ClusterCoordinator coordinator;
96 - private PartitionedDatabase partitionedDatabase; 102 + protected PartitionedDatabase partitionedDatabase;
97 - private Database inMemoryDatabase; 103 + protected Database inMemoryDatabase;
98 104
99 private TransactionManager transactionManager; 105 private TransactionManager transactionManager;
100 private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong(); 106 private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
101 107
108 + private ExecutorService eventDispatcher;
109 +
110 + private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
111 +
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ClusterService clusterService; 113 protected ClusterService clusterService;
104 114
...@@ -187,7 +197,10 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -187,7 +197,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
187 197
188 Futures.getUnchecked(status); 198 Futures.getUnchecked(status);
189 199
190 - transactionManager = new TransactionManager(partitionedDatabase); 200 + transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
201 +
202 + eventDispatcher = Executors.newSingleThreadExecutor(
203 + groupedThreads("onos/store/manager", "map-event-dispatcher"));
191 log.info("Started"); 204 log.info("Started");
192 } 205 }
193 206
...@@ -213,13 +226,14 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -213,13 +226,14 @@ public class DatabaseManager implements StorageService, StorageAdminService {
213 log.info("Successfully closed databases."); 226 log.info("Successfully closed databases.");
214 } 227 }
215 }); 228 });
229 + maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
230 + eventDispatcher.shutdown();
216 log.info("Stopped"); 231 log.info("Stopped");
217 } 232 }
218 233
219 @Override 234 @Override
220 public TransactionContextBuilder transactionContextBuilder() { 235 public TransactionContextBuilder transactionContextBuilder() {
221 - return new DefaultTransactionContextBuilder( 236 + return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
222 - inMemoryDatabase, partitionedDatabase, transactionIdGenerator.getNewId());
223 } 237 }
224 238
225 @Override 239 @Override
...@@ -296,12 +310,12 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -296,12 +310,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
296 310
297 @Override 311 @Override
298 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() { 312 public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
299 - return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase); 313 + return new DefaultConsistentMapBuilder<>(this);
300 } 314 }
301 315
302 @Override 316 @Override
303 - public <E> SetBuilder<E> setBuilder() { 317 + public <E> DistributedSetBuilder<E> setBuilder() {
304 - return new DefaultSetBuilder<>(partitionedDatabase); 318 + return new DefaultDistributedSetBuilder<>(this);
305 } 319 }
306 320
307 @Override 321 @Override
...@@ -370,4 +384,20 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -370,4 +384,20 @@ public class DatabaseManager implements StorageService, StorageAdminService {
370 public void redriveTransactions() { 384 public void redriveTransactions() {
371 getTransactions().stream().forEach(transactionManager::execute); 385 getTransactions().stream().forEach(transactionManager::execute);
372 } 386 }
387 +
388 + protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
389 + // TODO: Support different local instances of the same map.
390 + if (!maps.add(map)) {
391 + throw new IllegalStateException("Map by name " + map.name() + " already exists");
392 + }
393 +
394 + clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
395 + map.serializer()::decode,
396 + map::notifyLocalListeners,
397 + eventDispatcher);
398 + }
399 +
400 + protected static MessageSubject mapUpdatesSubject(String mapName) {
401 + return new MessageSubject(mapName + "-map-updates");
402 + }
373 } 403 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 import static com.google.common.base.Preconditions.*; 19 import static com.google.common.base.Preconditions.*;
20 +import static org.slf4j.LoggerFactory.getLogger;
20 21
21 import java.util.Collection; 22 import java.util.Collection;
22 import java.util.Map; 23 import java.util.Map;
...@@ -24,8 +25,10 @@ import java.util.Map.Entry; ...@@ -24,8 +25,10 @@ import java.util.Map.Entry;
24 import java.util.Objects; 25 import java.util.Objects;
25 import java.util.Optional; 26 import java.util.Optional;
26 import java.util.concurrent.CompletableFuture; 27 import java.util.concurrent.CompletableFuture;
28 +import java.util.concurrent.CopyOnWriteArraySet;
27 import java.util.concurrent.atomic.AtomicReference; 29 import java.util.concurrent.atomic.AtomicReference;
28 import java.util.function.BiFunction; 30 import java.util.function.BiFunction;
31 +import java.util.function.Consumer;
29 import java.util.function.Function; 32 import java.util.function.Function;
30 import java.util.function.Predicate; 33 import java.util.function.Predicate;
31 import java.util.stream.Collectors; 34 import java.util.stream.Collectors;
...@@ -36,8 +39,11 @@ import org.onlab.util.HexString; ...@@ -36,8 +39,11 @@ import org.onlab.util.HexString;
36 import org.onlab.util.Tools; 39 import org.onlab.util.Tools;
37 import org.onosproject.store.service.AsyncConsistentMap; 40 import org.onosproject.store.service.AsyncConsistentMap;
38 import org.onosproject.store.service.ConsistentMapException; 41 import org.onosproject.store.service.ConsistentMapException;
42 +import org.onosproject.store.service.MapEvent;
43 +import org.onosproject.store.service.MapEventListener;
39 import org.onosproject.store.service.Serializer; 44 import org.onosproject.store.service.Serializer;
40 import org.onosproject.store.service.Versioned; 45 import org.onosproject.store.service.Versioned;
46 +import org.slf4j.Logger;
41 47
42 import com.google.common.cache.CacheBuilder; 48 import com.google.common.cache.CacheBuilder;
43 import com.google.common.cache.CacheLoader; 49 import com.google.common.cache.CacheLoader;
...@@ -56,6 +62,11 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -56,6 +62,11 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
56 private final Database database; 62 private final Database database;
57 private final Serializer serializer; 63 private final Serializer serializer;
58 private final boolean readOnly; 64 private final boolean readOnly;
65 + private final Consumer<MapEvent<K, V>> eventPublisher;
66 +
67 + private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
68 +
69 + private final Logger log = getLogger(getClass());
59 70
60 private static final String ERROR_NULL_KEY = "Key cannot be null"; 71 private static final String ERROR_NULL_KEY = "Key cannot be null";
61 private static final String ERROR_NULL_VALUE = "Null values are not allowed"; 72 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
...@@ -77,11 +88,29 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -77,11 +88,29 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
77 public DefaultAsyncConsistentMap(String name, 88 public DefaultAsyncConsistentMap(String name,
78 Database database, 89 Database database,
79 Serializer serializer, 90 Serializer serializer,
80 - boolean readOnly) { 91 + boolean readOnly,
92 + Consumer<MapEvent<K, V>> eventPublisher) {
81 this.name = checkNotNull(name, "map name cannot be null"); 93 this.name = checkNotNull(name, "map name cannot be null");
82 this.database = checkNotNull(database, "database cannot be null"); 94 this.database = checkNotNull(database, "database cannot be null");
83 this.serializer = checkNotNull(serializer, "serializer cannot be null"); 95 this.serializer = checkNotNull(serializer, "serializer cannot be null");
84 this.readOnly = readOnly; 96 this.readOnly = readOnly;
97 + this.eventPublisher = eventPublisher;
98 + }
99 +
100 + /**
101 + * Returns this map name.
102 + * @return map name
103 + */
104 + public String name() {
105 + return name;
106 + }
107 +
108 + /**
109 + * Returns the serializer for map entries.
110 + * @return map entry serializer
111 + */
112 + public Serializer serializer() {
113 + return serializer;
85 } 114 }
86 115
87 @Override 116 @Override
...@@ -139,6 +168,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -139,6 +168,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
139 checkNotNull(key, ERROR_NULL_KEY); 168 checkNotNull(key, ERROR_NULL_KEY);
140 checkNotNull(condition, "predicate function cannot be null"); 169 checkNotNull(condition, "predicate function cannot be null");
141 checkNotNull(remappingFunction, "Remapping function cannot be null"); 170 checkNotNull(remappingFunction, "Remapping function cannot be null");
171 + AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
142 return get(key).thenCompose(r1 -> { 172 return get(key).thenCompose(r1 -> {
143 V existingValue = r1 == null ? null : r1.value(); 173 V existingValue = r1 == null ? null : r1.value();
144 // if the condition evaluates to false, return existing value. 174 // if the condition evaluates to false, return existing value.
...@@ -160,6 +190,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -160,6 +190,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
160 if (r1 != null) { 190 if (r1 != null) {
161 return remove(key, r1.version()).thenApply(result -> { 191 return remove(key, r1.version()).thenApply(result -> {
162 if (result) { 192 if (result) {
193 + mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
163 return null; 194 return null;
164 } else { 195 } else {
165 throw new ConsistentMapException.ConcurrentModification(); 196 throw new ConsistentMapException.ConcurrentModification();
...@@ -174,6 +205,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -174,6 +205,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
174 return replaceAndGet(key, r1.version(), computedValue.get()) 205 return replaceAndGet(key, r1.version(), computedValue.get())
175 .thenApply(v -> { 206 .thenApply(v -> {
176 if (v.isPresent()) { 207 if (v.isPresent()) {
208 + mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get()));
177 return v.get(); 209 return v.get();
178 } else { 210 } else {
179 throw new ConsistentMapException.ConcurrentModification(); 211 throw new ConsistentMapException.ConcurrentModification();
...@@ -184,12 +216,13 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -184,12 +216,13 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
184 if (!result.isPresent()) { 216 if (!result.isPresent()) {
185 throw new ConsistentMapException.ConcurrentModification(); 217 throw new ConsistentMapException.ConcurrentModification();
186 } else { 218 } else {
219 + mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
187 return result.get(); 220 return result.get();
188 } 221 }
189 }); 222 });
190 } 223 }
191 } 224 }
192 - }); 225 + }).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
193 } 226 }
194 227
195 @Override 228 @Override
...@@ -370,4 +403,35 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -370,4 +403,35 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
370 throw new UnsupportedOperationException(); 403 throw new UnsupportedOperationException();
371 } 404 }
372 } 405 }
406 +
407 + @Override
408 + public void addListener(MapEventListener<K, V> listener) {
409 + listeners.add(listener);
410 + }
411 +
412 + @Override
413 + public void removeListener(MapEventListener<K, V> listener) {
414 + listeners.remove(listener);
415 + }
416 +
417 + protected void notifyListeners(MapEvent<K, V> event) {
418 + try {
419 + if (event != null) {
420 + notifyLocalListeners(event);
421 + notifyRemoteListeners(event);
422 + }
423 + } catch (Exception e) {
424 + log.warn("Failure notifying listeners about {}", event, e);
425 + }
426 + }
427 +
428 + protected void notifyLocalListeners(MapEvent<K, V> event) {
429 + listeners.forEach(listener -> listener.event(event));
430 + }
431 +
432 + protected void notifyRemoteListeners(MapEvent<K, V> event) {
433 + if (eventPublisher != null) {
434 + eventPublisher.accept(event);
435 + }
436 + }
373 } 437 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -28,10 +28,9 @@ import java.util.function.Function; ...@@ -28,10 +28,9 @@ import java.util.function.Function;
28 import java.util.function.Predicate; 28 import java.util.function.Predicate;
29 import java.util.Set; 29 import java.util.Set;
30 30
31 -import org.onosproject.store.service.AsyncConsistentMap;
32 import org.onosproject.store.service.ConsistentMap; 31 import org.onosproject.store.service.ConsistentMap;
33 import org.onosproject.store.service.ConsistentMapException; 32 import org.onosproject.store.service.ConsistentMapException;
34 -import org.onosproject.store.service.Serializer; 33 +import org.onosproject.store.service.MapEventListener;
35 import org.onosproject.store.service.Versioned; 34 import org.onosproject.store.service.Versioned;
36 35
37 /** 36 /**
...@@ -45,13 +44,14 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -45,13 +44,14 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
45 44
46 private static final int OPERATION_TIMEOUT_MILLIS = 5000; 45 private static final int OPERATION_TIMEOUT_MILLIS = 5000;
47 46
48 - private final AsyncConsistentMap<K, V> asyncMap; 47 + private final DefaultAsyncConsistentMap<K, V> asyncMap;
49 48
50 - public DefaultConsistentMap(String name, 49 + public String name() {
51 - Database database, 50 + return asyncMap.name();
52 - Serializer serializer, 51 + }
53 - boolean readOnly) { 52 +
54 - asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer, readOnly); 53 + public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
54 + this.asyncMap = asyncMap;
55 } 55 }
56 56
57 @Override 57 @Override
...@@ -190,4 +190,14 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> { ...@@ -190,4 +190,14 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
190 } 190 }
191 } 191 }
192 } 192 }
193 +
194 + @Override
195 + public void addListener(MapEventListener<K, V> listener) {
196 + asyncMap.addListener(listener);
197 + }
198 +
199 + @Override
200 + public void removeListener(MapEventListener<K, V> listener) {
201 + asyncMap.addListener(listener);
202 + }
193 } 203 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -6,6 +6,7 @@ import static com.google.common.base.Preconditions.checkState; ...@@ -6,6 +6,7 @@ import static com.google.common.base.Preconditions.checkState;
6 import org.onosproject.store.service.AsyncConsistentMap; 6 import org.onosproject.store.service.AsyncConsistentMap;
7 import org.onosproject.store.service.ConsistentMap; 7 import org.onosproject.store.service.ConsistentMap;
8 import org.onosproject.store.service.ConsistentMapBuilder; 8 import org.onosproject.store.service.ConsistentMapBuilder;
9 +import org.onosproject.store.service.MapEvent;
9 import org.onosproject.store.service.Serializer; 10 import org.onosproject.store.service.Serializer;
10 11
11 /** 12 /**
...@@ -20,12 +21,10 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -20,12 +21,10 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
20 private String name; 21 private String name;
21 private boolean partitionsEnabled = true; 22 private boolean partitionsEnabled = true;
22 private boolean readOnly = false; 23 private boolean readOnly = false;
23 - private final Database partitionedDatabase; 24 + private final DatabaseManager manager;
24 - private final Database inMemoryDatabase;
25 25
26 - public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) { 26 + public DefaultConsistentMapBuilder(DatabaseManager manager) {
27 - this.inMemoryDatabase = inMemoryDatabase; 27 + this.manager = manager;
28 - this.partitionedDatabase = partitionedDatabase;
29 } 28 }
30 29
31 @Override 30 @Override
...@@ -60,21 +59,25 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -60,21 +59,25 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
60 59
61 @Override 60 @Override
62 public ConsistentMap<K, V> build() { 61 public ConsistentMap<K, V> build() {
63 - checkState(validInputs()); 62 + return new DefaultConsistentMap<>(buildAndRegisterMap());
64 - return new DefaultConsistentMap<>(
65 - name,
66 - partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
67 - serializer,
68 - readOnly);
69 } 63 }
70 64
71 @Override 65 @Override
72 public AsyncConsistentMap<K, V> buildAsyncMap() { 66 public AsyncConsistentMap<K, V> buildAsyncMap() {
67 + return buildAndRegisterMap();
68 + }
69 +
70 + private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
73 checkState(validInputs()); 71 checkState(validInputs());
74 - return new DefaultAsyncConsistentMap<>( 72 + DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
75 name, 73 name,
76 - partitionsEnabled ? partitionedDatabase : inMemoryDatabase, 74 + partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
77 serializer, 75 serializer,
78 - readOnly); 76 + readOnly,
77 + event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
78 + DatabaseManager.mapUpdatesSubject(name),
79 + serializer::encode));
80 + manager.registerMap(asyncMap);
81 + return asyncMap;
79 } 82 }
80 } 83 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -17,11 +17,17 @@ package org.onosproject.store.consistent.impl; ...@@ -17,11 +17,17 @@ package org.onosproject.store.consistent.impl;
17 17
18 import java.util.Collection; 18 import java.util.Collection;
19 import java.util.Iterator; 19 import java.util.Iterator;
20 +import java.util.Map;
20 import java.util.Set; 21 import java.util.Set;
21 22
22 import org.onosproject.store.service.ConsistentMap; 23 import org.onosproject.store.service.ConsistentMap;
23 -import org.onosproject.store.service.Serializer; 24 +import org.onosproject.store.service.DistributedSet;
25 +import org.onosproject.store.service.MapEvent;
26 +import org.onosproject.store.service.MapEventListener;
27 +import org.onosproject.store.service.SetEvent;
28 +import org.onosproject.store.service.SetEventListener;
24 29
30 +import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets; 31 import com.google.common.collect.Sets;
26 32
27 /** 33 /**
...@@ -29,12 +35,15 @@ import com.google.common.collect.Sets; ...@@ -29,12 +35,15 @@ import com.google.common.collect.Sets;
29 35
30 * @param <E> set element type 36 * @param <E> set element type
31 */ 37 */
32 -public class DefaultDistributedSet<E> implements Set<E> { 38 +public class DefaultDistributedSet<E> implements DistributedSet<E> {
33 39
40 + private final String name;
34 private final ConsistentMap<E, Boolean> backingMap; 41 private final ConsistentMap<E, Boolean> backingMap;
42 + private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
35 43
36 - public DefaultDistributedSet(String name, Database database, Serializer serializer, boolean readOnly) { 44 + public DefaultDistributedSet(String name, ConsistentMap<E, Boolean> backingMap) {
37 - backingMap = new DefaultConsistentMap<>(name, database, serializer, readOnly); 45 + this.name = name;
46 + this.backingMap = backingMap;
38 } 47 }
39 48
40 @Override 49 @Override
...@@ -76,7 +85,7 @@ public class DefaultDistributedSet<E> implements Set<E> { ...@@ -76,7 +85,7 @@ public class DefaultDistributedSet<E> implements Set<E> {
76 @SuppressWarnings("unchecked") 85 @SuppressWarnings("unchecked")
77 @Override 86 @Override
78 public boolean remove(Object o) { 87 public boolean remove(Object o) {
79 - return backingMap.remove((E) o, true); 88 + return backingMap.remove((E) o) != null;
80 } 89 }
81 90
82 @Override 91 @Override
...@@ -119,4 +128,26 @@ public class DefaultDistributedSet<E> implements Set<E> { ...@@ -119,4 +128,26 @@ public class DefaultDistributedSet<E> implements Set<E> {
119 public void clear() { 128 public void clear() {
120 backingMap.clear(); 129 backingMap.clear();
121 } 130 }
131 +
132 + @Override
133 + public void addListener(SetEventListener<E> listener) {
134 + MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
135 + if (mapEvent.type() == MapEvent.Type.INSERT) {
136 + listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
137 + } else if (mapEvent.type() == MapEvent.Type.REMOVE) {
138 + listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
139 + }
140 + };
141 + if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
142 + backingMap.addListener(mapEventListener);
143 + }
144 + }
145 +
146 + @Override
147 + public void removeListener(SetEventListener<E> listener) {
148 + MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
149 + if (mapEventListener != null) {
150 + backingMap.removeListener(mapEventListener);
151 + }
152 + }
122 } 153 }
......
...@@ -15,58 +15,52 @@ ...@@ -15,58 +15,52 @@
15 */ 15 */
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 -import static com.google.common.base.Preconditions.checkArgument; 18 +import org.onosproject.store.service.ConsistentMapBuilder;
19 -import static com.google.common.base.Preconditions.checkNotNull; 19 +import org.onosproject.store.service.DistributedSet;
20 -import static com.google.common.base.Preconditions.checkState;
21 -
22 -import java.util.Set;
23 -
24 import org.onosproject.store.service.Serializer; 20 import org.onosproject.store.service.Serializer;
25 -import org.onosproject.store.service.SetBuilder; 21 +import org.onosproject.store.service.DistributedSetBuilder;
26 22
27 /** 23 /**
28 - * Default Set builder. 24 + * Default distributed set builder.
29 * 25 *
30 * @param <E> type for set elements 26 * @param <E> type for set elements
31 */ 27 */
32 -public class DefaultSetBuilder<E> implements SetBuilder<E> { 28 +public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
33 29
34 - private Serializer serializer;
35 private String name; 30 private String name;
36 - private final Database database; 31 + private ConsistentMapBuilder<E, Boolean> mapBuilder;
37 - private boolean readOnly;
38 32
39 - public DefaultSetBuilder(Database database) { 33 + public DefaultDistributedSetBuilder(DatabaseManager manager) {
40 - this.database = checkNotNull(database); 34 + this.mapBuilder = manager.consistentMapBuilder();
41 } 35 }
42 36
43 @Override 37 @Override
44 - public SetBuilder<E> withName(String name) { 38 + public DistributedSetBuilder<E> withName(String name) {
45 - checkArgument(name != null && !name.isEmpty()); 39 + mapBuilder.withName(name);
46 this.name = name; 40 this.name = name;
47 return this; 41 return this;
48 } 42 }
49 43
50 @Override 44 @Override
51 - public SetBuilder<E> withSerializer(Serializer serializer) { 45 + public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
52 - checkArgument(serializer != null); 46 + mapBuilder.withSerializer(serializer);
53 - this.serializer = serializer;
54 return this; 47 return this;
55 } 48 }
56 49
57 @Override 50 @Override
58 - public SetBuilder<E> withUpdatesDisabled() { 51 + public DistributedSetBuilder<E> withUpdatesDisabled() {
59 - readOnly = true; 52 + mapBuilder.withUpdatesDisabled();
60 return this; 53 return this;
61 } 54 }
62 55
63 - private boolean validInputs() { 56 + @Override
64 - return name != null && serializer != null; 57 + public DistributedSetBuilder<E> withPartitionsDisabled() {
58 + mapBuilder.withPartitionsDisabled();
59 + return this;
65 } 60 }
66 61
67 @Override 62 @Override
68 - public Set<E> build() { 63 + public DistributedSet<E> build() {
69 - checkState(validInputs()); 64 + return new DefaultDistributedSet<E>(name, mapBuilder.build());
70 - return new DefaultDistributedSet<>(name, database, serializer, readOnly);
71 } 65 }
72 } 66 }
......
...@@ -18,9 +18,11 @@ package org.onosproject.store.consistent.impl; ...@@ -18,9 +18,11 @@ package org.onosproject.store.consistent.impl;
18 18
19 import java.util.List; 19 import java.util.List;
20 import java.util.Map; 20 import java.util.Map;
21 +import java.util.function.Supplier;
21 22
22 import static com.google.common.base.Preconditions.*; 23 import static com.google.common.base.Preconditions.*;
23 24
25 +import org.onosproject.store.service.ConsistentMapBuilder;
24 import org.onosproject.store.service.DatabaseUpdate; 26 import org.onosproject.store.service.DatabaseUpdate;
25 import org.onosproject.store.service.Serializer; 27 import org.onosproject.store.service.Serializer;
26 import org.onosproject.store.service.TransactionContext; 28 import org.onosproject.store.service.TransactionContext;
...@@ -41,10 +43,14 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -41,10 +43,14 @@ public class DefaultTransactionContext implements TransactionContext {
41 private boolean isOpen = false; 43 private boolean isOpen = false;
42 private final Database database; 44 private final Database database;
43 private final long transactionId; 45 private final long transactionId;
46 + private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
44 47
45 - public DefaultTransactionContext(Database database, long transactionId) { 48 + public DefaultTransactionContext(long transactionId,
46 - this.database = checkNotNull(database); 49 + Database database,
50 + Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
47 this.transactionId = transactionId; 51 this.transactionId = transactionId;
52 + this.database = checkNotNull(database);
53 + this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
48 } 54 }
49 55
50 @Override 56 @Override
...@@ -72,7 +78,7 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -72,7 +78,7 @@ public class DefaultTransactionContext implements TransactionContext {
72 checkNotNull(serializer); 78 checkNotNull(serializer);
73 return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>( 79 return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
74 name, 80 name,
75 - new DefaultConsistentMap<>(name, database, serializer, false), 81 + mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
76 this, 82 this,
77 serializer)); 83 serializer));
78 } 84 }
...@@ -85,6 +91,7 @@ public class DefaultTransactionContext implements TransactionContext { ...@@ -85,6 +91,7 @@ public class DefaultTransactionContext implements TransactionContext {
85 List<DatabaseUpdate> updates = Lists.newLinkedList(); 91 List<DatabaseUpdate> updates = Lists.newLinkedList();
86 txMaps.values() 92 txMaps.values()
87 .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); }); 93 .forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
94 + // FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
88 database.prepareAndCommit(new DefaultTransaction(transactionId, updates)); 95 database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
89 } catch (Exception e) { 96 } catch (Exception e) {
90 abort(); 97 abort();
......
...@@ -10,14 +10,11 @@ import org.onosproject.store.service.TransactionContextBuilder; ...@@ -10,14 +10,11 @@ import org.onosproject.store.service.TransactionContextBuilder;
10 public class DefaultTransactionContextBuilder implements TransactionContextBuilder { 10 public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
11 11
12 private boolean partitionsEnabled = true; 12 private boolean partitionsEnabled = true;
13 - private final Database partitionedDatabase; 13 + private final DatabaseManager manager;
14 - private final Database inMemoryDatabase;
15 private final long transactionId; 14 private final long transactionId;
16 15
17 - public DefaultTransactionContextBuilder( 16 + public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
18 - Database inMemoryDatabase, Database partitionedDatabase, long transactionId) { 17 + this.manager = manager;
19 - this.partitionedDatabase = partitionedDatabase;
20 - this.inMemoryDatabase = inMemoryDatabase;
21 this.transactionId = transactionId; 18 this.transactionId = transactionId;
22 } 19 }
23 20
...@@ -30,8 +27,9 @@ public class DefaultTransactionContextBuilder implements TransactionContextBuild ...@@ -30,8 +27,9 @@ public class DefaultTransactionContextBuilder implements TransactionContextBuild
30 @Override 27 @Override
31 public TransactionContext build() { 28 public TransactionContext build() {
32 return new DefaultTransactionContext( 29 return new DefaultTransactionContext(
33 - partitionsEnabled ? partitionedDatabase : inMemoryDatabase, 30 + transactionId,
34 - transactionId); 31 + partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
32 + () -> partitionsEnabled ? manager.consistentMapBuilder()
33 + : manager.consistentMapBuilder().withPartitionsDisabled());
35 } 34 }
36 -
37 } 35 }
......
...@@ -50,6 +50,7 @@ public class PartitionedDatabase implements Database { ...@@ -50,6 +50,7 @@ public class PartitionedDatabase implements Database {
50 private final List<Database> partitions; 50 private final List<Database> partitions;
51 private final AtomicBoolean isOpen = new AtomicBoolean(false); 51 private final AtomicBoolean isOpen = new AtomicBoolean(false);
52 private static final String DB_NOT_OPEN = "Partitioned Database is not open"; 52 private static final String DB_NOT_OPEN = "Partitioned Database is not open";
53 + private TransactionManager transactionManager;
53 54
54 public PartitionedDatabase( 55 public PartitionedDatabase(
55 String name, 56 String name,
...@@ -285,7 +286,10 @@ public class PartitionedDatabase implements Database { ...@@ -285,7 +286,10 @@ public class PartitionedDatabase implements Database {
285 subTransactions.entrySet().iterator().next(); 286 subTransactions.entrySet().iterator().next();
286 return entry.getKey().prepareAndCommit(entry.getValue()); 287 return entry.getKey().prepareAndCommit(entry.getValue());
287 } else { 288 } else {
288 - return new TransactionManager(this).execute(transaction); 289 + if (transactionManager != null) {
290 + throw new IllegalStateException("TransactionManager is not initialized");
291 + }
292 + return transactionManager.execute(transaction);
289 } 293 }
290 } 294 }
291 295
...@@ -387,4 +391,8 @@ public class PartitionedDatabase implements Database { ...@@ -387,4 +391,8 @@ public class PartitionedDatabase implements Database {
387 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v))); 391 perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
388 return subTransactions; 392 return subTransactions;
389 } 393 }
394 +
395 + protected void setTransactionManager(TransactionManager tranasactionManager) {
396 + this.transactionManager = transactionManager;
397 + }
390 } 398 }
......
...@@ -17,6 +17,7 @@ package org.onosproject.store.consistent.impl; ...@@ -17,6 +17,7 @@ package org.onosproject.store.consistent.impl;
17 17
18 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
19 19
20 +import java.util.Arrays;
20 import java.util.Collection; 21 import java.util.Collection;
21 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.CompletableFuture;
22 import java.util.stream.Collectors; 23 import java.util.stream.Collectors;
...@@ -26,6 +27,7 @@ import org.apache.commons.lang3.tuple.Pair; ...@@ -26,6 +27,7 @@ import org.apache.commons.lang3.tuple.Pair;
26 import org.onlab.util.KryoNamespace; 27 import org.onlab.util.KryoNamespace;
27 import org.onosproject.store.serializers.KryoNamespaces; 28 import org.onosproject.store.serializers.KryoNamespaces;
28 import org.onosproject.store.service.AsyncConsistentMap; 29 import org.onosproject.store.service.AsyncConsistentMap;
30 +import org.onosproject.store.service.ConsistentMapBuilder;
29 import org.onosproject.store.service.DatabaseUpdate; 31 import org.onosproject.store.service.DatabaseUpdate;
30 import org.onosproject.store.service.Serializer; 32 import org.onosproject.store.service.Serializer;
31 import org.onosproject.store.service.Transaction; 33 import org.onosproject.store.service.Transaction;
...@@ -49,7 +51,7 @@ public class TransactionManager { ...@@ -49,7 +51,7 @@ public class TransactionManager {
49 .register(ImmutablePair.class) 51 .register(ImmutablePair.class)
50 .build(); 52 .build();
51 53
52 - private final Serializer serializer = Serializer.using(KRYO_NAMESPACE); 54 + private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
53 private final Database database; 55 private final Database database;
54 private final AsyncConsistentMap<Long, Transaction> transactions; 56 private final AsyncConsistentMap<Long, Transaction> transactions;
55 57
...@@ -58,9 +60,11 @@ public class TransactionManager { ...@@ -58,9 +60,11 @@ public class TransactionManager {
58 * 60 *
59 * @param database database 61 * @param database database
60 */ 62 */
61 - public TransactionManager(Database database) { 63 + public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
62 this.database = checkNotNull(database, "database cannot be null"); 64 this.database = checkNotNull(database, "database cannot be null");
63 - this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer, false); 65 + this.transactions = mapBuilder.withName("onos-transactions")
66 + .withSerializer(serializer)
67 + .buildAsyncMap();
64 } 68 }
65 69
66 /** 70 /**
......
...@@ -170,6 +170,8 @@ import org.onosproject.net.resource.link.MplsLabel; ...@@ -170,6 +170,8 @@ import org.onosproject.net.resource.link.MplsLabel;
170 import org.onosproject.net.resource.link.MplsLabelResourceAllocation; 170 import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
171 import org.onosproject.net.resource.link.MplsLabelResourceRequest; 171 import org.onosproject.net.resource.link.MplsLabelResourceRequest;
172 import org.onosproject.store.Timestamp; 172 import org.onosproject.store.Timestamp;
173 +import org.onosproject.store.service.MapEvent;
174 +import org.onosproject.store.service.SetEvent;
173 import org.onosproject.store.service.Versioned; 175 import org.onosproject.store.service.Versioned;
174 176
175 import java.net.URI; 177 import java.net.URI;
...@@ -408,6 +410,10 @@ public final class KryoNamespaces { ...@@ -408,6 +410,10 @@ public final class KryoNamespaces {
408 .register(new HostLocationSerializer(), HostLocation.class) 410 .register(new HostLocationSerializer(), HostLocation.class)
409 .register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class) 411 .register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class)
410 .register(Versioned.class) 412 .register(Versioned.class)
413 + .register(MapEvent.class)
414 + .register(MapEvent.Type.class)
415 + .register(SetEvent.class)
416 + .register(SetEvent.Type.class)
411 .register(DefaultGroupId.class) 417 .register(DefaultGroupId.class)
412 .register(Annotations.class) 418 .register(Annotations.class)
413 .register(OmsPort.class) 419 .register(OmsPort.class)
......