Madan Jampani
Committed by Gerrit Code Review

Consistently ordered notification support for single partition scenario.

Change-Id: I6d959fafb879aa89885c2fb758aa73efd4b47cb0
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 19
20 +import java.util.function.Consumer;
21 +
20 import net.kuujo.copycat.cluster.ClusterConfig; 22 import net.kuujo.copycat.cluster.ClusterConfig;
21 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; 23 import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
22 import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig; 24 import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
...@@ -81,4 +83,22 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa ...@@ -81,4 +83,22 @@ public interface Database extends DatabaseProxy<String, byte[]>, Resource<Databa
81 .addStartupTask(() -> coordinator.open().thenApply(v -> null)) 83 .addStartupTask(() -> coordinator.open().thenApply(v -> null))
82 .addShutdownTask(coordinator::close); 84 .addShutdownTask(coordinator::close);
83 } 85 }
86 +
87 + /**
88 + * Tells whether the database supports change notifications.
89 + * @return true if notifications are supported; false otherwise
90 + */
91 + boolean hasChangeNotificationSupport();
92 +
93 + /**
94 + * Registers a new consumer of StateMachineUpdates.
95 + * @param consumer consumer to register
96 + */
97 + void registerConsumer(Consumer<StateMachineUpdate> consumer);
98 +
99 + /**
100 + * Unregisters a consumer of StateMachineUpdates.
101 + * @param consumer consumer to unregister
102 + */
103 + void unregisterConsumer(Consumer<StateMachineUpdate> consumer);
84 } 104 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -34,8 +34,12 @@ import java.util.stream.Collectors; ...@@ -34,8 +34,12 @@ import java.util.stream.Collectors;
34 import java.util.Set; 34 import java.util.Set;
35 35
36 import org.onlab.util.HexString; 36 import org.onlab.util.HexString;
37 +import org.onlab.util.SharedExecutors;
37 import org.onlab.util.Tools; 38 import org.onlab.util.Tools;
38 import org.onosproject.core.ApplicationId; 39 import org.onosproject.core.ApplicationId;
40 +
41 +import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
42 +
39 import org.onosproject.store.service.AsyncConsistentMap; 43 import org.onosproject.store.service.AsyncConsistentMap;
40 import org.onosproject.store.service.ConsistentMapException; 44 import org.onosproject.store.service.ConsistentMapException;
41 import org.onosproject.store.service.MapEvent; 45 import org.onosproject.store.service.MapEvent;
...@@ -101,6 +105,17 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -101,6 +105,17 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
101 this.readOnly = readOnly; 105 this.readOnly = readOnly;
102 this.purgeOnUninstall = purgeOnUninstall; 106 this.purgeOnUninstall = purgeOnUninstall;
103 this.eventPublisher = eventPublisher; 107 this.eventPublisher = eventPublisher;
108 + this.database.registerConsumer(update -> {
109 + SharedExecutors.getSingleThreadExecutor().execute(() -> {
110 + if (update.target() == MAP) {
111 + Result<UpdateResult<String, byte[]>> result = update.output();
112 + if (result.success() && result.value().mapName().equals(name)) {
113 + MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
114 + notifyLocalListeners(mapEvent);
115 + }
116 + }
117 + });
118 + });
104 } 119 }
105 120
106 /** 121 /**
...@@ -322,7 +337,11 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -322,7 +337,11 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
322 value == null ? null : serializer.encode(value)) 337 value == null ? null : serializer.encode(value))
323 .thenApply(this::unwrapResult) 338 .thenApply(this::unwrapResult)
324 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode)) 339 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode))
325 - .whenComplete((r, e) -> notifyListeners(r != null ? r.toMapEvent() : null)); 340 + .whenComplete((r, e) -> {
341 + if (r != null && e == null && !database.hasChangeNotificationSupport()) {
342 + notifyListeners(r.toMapEvent());
343 + }
344 + });
326 } 345 }
327 346
328 private <T> T unwrapResult(Result<T> result) { 347 private <T> T unwrapResult(Result<T> result) {
...@@ -363,8 +382,10 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -363,8 +382,10 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
363 } 382 }
364 383
365 protected void notifyLocalListeners(MapEvent<K, V> event) { 384 protected void notifyLocalListeners(MapEvent<K, V> event) {
385 + if (event != null) {
366 listeners.forEach(listener -> listener.event(event)); 386 listeners.forEach(listener -> listener.event(event));
367 } 387 }
388 + }
368 389
369 protected void notifyRemoteListeners(MapEvent<K, V> event) { 390 protected void notifyRemoteListeners(MapEvent<K, V> event) {
370 if (eventPublisher != null) { 391 if (eventPublisher != null) {
......
...@@ -21,23 +21,29 @@ import net.kuujo.copycat.resource.internal.AbstractResource; ...@@ -21,23 +21,29 @@ import net.kuujo.copycat.resource.internal.AbstractResource;
21 import net.kuujo.copycat.resource.internal.ResourceManager; 21 import net.kuujo.copycat.resource.internal.ResourceManager;
22 import net.kuujo.copycat.state.internal.DefaultStateMachine; 22 import net.kuujo.copycat.state.internal.DefaultStateMachine;
23 import net.kuujo.copycat.util.concurrent.Futures; 23 import net.kuujo.copycat.util.concurrent.Futures;
24 +import net.kuujo.copycat.util.function.TriConsumer;
24 25
25 import java.util.Collection; 26 import java.util.Collection;
26 import java.util.Map; 27 import java.util.Map;
27 import java.util.Set; 28 import java.util.Set;
28 import java.util.concurrent.CompletableFuture; 29 import java.util.concurrent.CompletableFuture;
30 +import java.util.function.Consumer;
29 import java.util.function.Supplier; 31 import java.util.function.Supplier;
30 32
31 import org.onosproject.cluster.NodeId; 33 import org.onosproject.cluster.NodeId;
32 import org.onosproject.store.service.Transaction; 34 import org.onosproject.store.service.Transaction;
33 import org.onosproject.store.service.Versioned; 35 import org.onosproject.store.service.Versioned;
34 36
37 +import com.google.common.collect.Sets;
38 +
35 /** 39 /**
36 * Default database. 40 * Default database.
37 */ 41 */
38 public class DefaultDatabase extends AbstractResource<Database> implements Database { 42 public class DefaultDatabase extends AbstractResource<Database> implements Database {
39 private final StateMachine<DatabaseState<String, byte[]>> stateMachine; 43 private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
40 private DatabaseProxy<String, byte[]> proxy; 44 private DatabaseProxy<String, byte[]> proxy;
45 + private final Set<Consumer<StateMachineUpdate>> consumers = Sets.newCopyOnWriteArraySet();
46 + private final TriConsumer<String, Object, Object> watcher = new InternalStateMachineWatcher();
41 47
42 @SuppressWarnings({ "unchecked", "rawtypes" }) 48 @SuppressWarnings({ "unchecked", "rawtypes" })
43 public DefaultDatabase(ResourceManager context) { 49 public DefaultDatabase(ResourceManager context) {
...@@ -46,6 +52,14 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -46,6 +52,14 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
46 DatabaseState.class, 52 DatabaseState.class,
47 DefaultDatabaseState.class, 53 DefaultDatabaseState.class,
48 DefaultDatabase.class.getClassLoader()); 54 DefaultDatabase.class.getClassLoader());
55 + this.stateMachine.addStartupTask(() -> {
56 + stateMachine.registerWatcher(watcher);
57 + return CompletableFuture.completedFuture(null);
58 + });
59 + this.stateMachine.addShutdownTask(() -> {
60 + stateMachine.unregisterWatcher(watcher);
61 + return CompletableFuture.completedFuture(null);
62 + });
49 } 63 }
50 64
51 /** 65 /**
...@@ -209,4 +223,27 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab ...@@ -209,4 +223,27 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
209 } 223 }
210 return false; 224 return false;
211 } 225 }
226 +
227 + @Override
228 + public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
229 + consumers.add(consumer);
230 + }
231 +
232 + @Override
233 + public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
234 + consumers.remove(consumer);
235 + }
236 +
237 + @Override
238 + public boolean hasChangeNotificationSupport() {
239 + return true;
240 + }
241 +
242 + private class InternalStateMachineWatcher implements TriConsumer<String, Object, Object> {
243 + @Override
244 + public void accept(String name, Object input, Object output) {
245 + StateMachineUpdate update = new StateMachineUpdate(name, input, output);
246 + consumers.forEach(consumer -> consumer.accept(update));
247 + }
248 + }
212 } 249 }
......
...@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture; ...@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CopyOnWriteArrayList; 25 import java.util.concurrent.CopyOnWriteArrayList;
26 import java.util.concurrent.atomic.AtomicBoolean; 26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger; 27 import java.util.concurrent.atomic.AtomicInteger;
28 +import java.util.function.Consumer;
28 import java.util.stream.Collectors; 29 import java.util.stream.Collectors;
29 30
30 import org.onosproject.cluster.NodeId; 31 import org.onosproject.cluster.NodeId;
...@@ -363,4 +364,17 @@ public class PartitionedDatabase implements Database { ...@@ -363,4 +364,17 @@ public class PartitionedDatabase implements Database {
363 protected void setTransactionManager(TransactionManager transactionManager) { 364 protected void setTransactionManager(TransactionManager transactionManager) {
364 this.transactionManager = transactionManager; 365 this.transactionManager = transactionManager;
365 } 366 }
367 +
368 + @Override
369 + public boolean hasChangeNotificationSupport() {
370 + return false;
371 + }
372 +
373 + @Override
374 + public void registerConsumer(Consumer<StateMachineUpdate> consumer) {
375 + }
376 +
377 + @Override
378 + public void unregisterConsumer(Consumer<StateMachineUpdate> consumer) {
379 + }
366 } 380 }
......
1 +package org.onosproject.store.consistent.impl;
2 +
3 +/**
4 + * Representation of a state machine update.
5 + */
6 +public class StateMachineUpdate {
7 +
8 + /**
9 + * Target data structure type this update is for.
10 + */
11 + enum Target {
12 + /**
13 + * Update is for a map.
14 + */
15 + MAP,
16 +
17 + /**
18 + * Update is for a non-map data structure.
19 + */
20 + OTHER
21 + }
22 +
23 + private final String operationName;
24 + private final Object input;
25 + private final Object output;
26 +
27 + public StateMachineUpdate(String operationName, Object input, Object output) {
28 + this.operationName = operationName;
29 + this.input = input;
30 + this.output = output;
31 + }
32 +
33 + public Target target() {
34 + // FIXME: This check is brittle
35 + if (operationName.contains("mapUpdate")) {
36 + return Target.MAP;
37 + } else {
38 + return Target.OTHER;
39 + }
40 + }
41 +
42 + @SuppressWarnings("unchecked")
43 + public <T> T input() {
44 + return (T) input;
45 + }
46 +
47 + @SuppressWarnings("unchecked")
48 + public <T> T output() {
49 + return (T) output;
50 + }
51 +}
...\ No newline at end of file ...\ No newline at end of file