Madan Jampani
Committed by Gerrit Code Review

Use the Executor interface when specifying where to handle incoming messages

This is done so that one can simply specify a direct executor.
Change-Id: I1c3ea977dd7c2d604588d587fd67f7012355eedf
......@@ -17,6 +17,7 @@ package org.onosproject.store.cluster.messaging;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -173,7 +174,7 @@ public interface ClusterCommunicationService {
Function<byte[], M> decoder,
Function<M, R> handler,
Function<R, byte[]> encoder,
ExecutorService executor);
Executor executor);
/**
* Adds a new subscriber for the specified message subject.
......@@ -187,7 +188,7 @@ public interface ClusterCommunicationService {
<M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Consumer<M> handler,
ExecutorService executor);
Executor executor);
/**
* Removes a subscriber for the specified message subject.
......
......@@ -43,6 +43,7 @@ import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -270,7 +271,7 @@ public class ClusterCommunicationManager
Function<byte[], M> decoder,
Function<M, R> handler,
Function<R, byte[]> encoder,
ExecutorService executor) {
Executor executor) {
messagingService.registerHandler(subject.value(),
new InternalMessageResponder<>(decoder, encoder, handler),
executor);
......@@ -280,7 +281,7 @@ public class ClusterCommunicationManager
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder,
Consumer<M> handler,
ExecutorService executor) {
Executor executor) {
messagingService.registerHandler(subject.value(),
new InternalMessageConsumer<>(decoder, handler),
executor);
......
......@@ -54,6 +54,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
......@@ -780,13 +781,13 @@ public class EventuallyConsistentMapImplTest {
@Override
public <M, R> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder, Function<M, R> handler,
Function<R, byte[]> encoder, ExecutorService executor) {
Function<R, byte[]> encoder, Executor executor) {
}
@Override
public <M> void addSubscriber(MessageSubject subject,
Function<byte[], M> decoder, Consumer<M> handler,
ExecutorService executor) {
Executor executor) {
}
@Override
......
......@@ -17,7 +17,7 @@ package org.onlab.netty;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
/**
* Interface for low level messaging primitives.
......@@ -48,7 +48,7 @@ public interface MessagingService {
* @param handler message handler
* @param executor executor to use for running message handler logic.
*/
public void registerHandler(String type, MessageHandler handler, ExecutorService executor);
public void registerHandler(String type, MessageHandler handler, Executor executor);
/**
* Registers a new message handler for message type.
......
......@@ -42,7 +42,7 @@ import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
......@@ -202,11 +202,11 @@ public class NettyMessagingService implements MessagingService {
}
@Override
public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
public void registerHandler(String type, MessageHandler handler, Executor executor) {
handlers.put(type, new MessageHandler() {
@Override
public void handle(Message message) throws IOException {
executor.submit(() -> {
executor.execute(() -> {
try {
handler.handle(message);
} catch (Exception e) {
......