Madan Jampani
Committed by Gerrit Code Review

Topic: Support for passing a executor to subscribe method for invoking the callback

Change-Id: I9db485ee381c61fbfc38aba0c2bd90cb5af171e0
......@@ -16,8 +16,11 @@
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import com.google.common.util.concurrent.MoreExecutors;
/**
* A distributed publish subscribe primitive.
* <p>
......@@ -49,9 +52,19 @@ public interface Topic<T> extends DistributedPrimitive {
/**
* Subscribes to messages published to this topic.
* @param callback callback that will invoked when a message published to the topic is received.
* @param executor executor for running the callback
* @return a future that is completed when subscription request is completed.
*/
CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor);
/**
* Subscribes to messages published to this topic.
* @param callback callback that will invoked when a message published to the topic is received.
* @return a future that is completed when subscription request is completed.
*/
CompletableFuture<Void> subscribe(Consumer<T> callback);
default CompletableFuture<Void> subscribe(Consumer<T> callback) {
return subscribe(callback, MoreExecutors.directExecutor());
}
/**
* Unsubscribes from this topic.
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.primitives.impl;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.onosproject.store.service.AsyncAtomicValue;
......@@ -61,8 +62,9 @@ public class DefaultDistributedTopic<T> implements Topic<T> {
}
@Override
public CompletableFuture<Void> subscribe(Consumer<T> callback) {
AtomicValueEventListener<T> valueListener = event -> callback.accept(event.newValue());
public CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor) {
AtomicValueEventListener<T> valueListener =
event -> executor.execute(() -> callback.accept(event.newValue()));
if (callbacks.putIfAbsent(callback, valueListener) == null) {
return atomicValue.addListener(valueListener);
}
......