Yuta HIGUCHI

ClusterMessagingProtocolClient: changed thread pool

Change-Id: Ibb37bd2c7c94067336152f19412523dc4cda9722
...@@ -3,13 +3,13 @@ package org.onlab.onos.store.service.impl; ...@@ -3,13 +3,13 @@ package org.onlab.onos.store.service.impl;
3 import static com.google.common.base.Verify.verifyNotNull; 3 import static com.google.common.base.Verify.verifyNotNull;
4 import static org.slf4j.LoggerFactory.getLogger; 4 import static org.slf4j.LoggerFactory.getLogger;
5 import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER; 5 import static org.onlab.onos.store.service.impl.ClusterMessagingProtocol.SERIALIZER;
6 +import static org.onlab.util.Tools.namedThreads;
6 7
7 import java.io.IOException; 8 import java.io.IOException;
8 import java.util.concurrent.CompletableFuture; 9 import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.ExecutionException; 10 import java.util.concurrent.ExecutionException;
10 -import java.util.concurrent.ScheduledExecutorService; 11 +import java.util.concurrent.ExecutorService;
11 -import java.util.concurrent.ScheduledThreadPoolExecutor; 12 +import java.util.concurrent.Executors;
12 -import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
15 15
...@@ -33,8 +33,6 @@ import org.onlab.onos.store.cluster.messaging.ClusterMessage; ...@@ -33,8 +33,6 @@ import org.onlab.onos.store.cluster.messaging.ClusterMessage;
33 import org.onlab.onos.store.cluster.messaging.MessageSubject; 33 import org.onlab.onos.store.cluster.messaging.MessageSubject;
34 import org.slf4j.Logger; 34 import org.slf4j.Logger;
35 35
36 -import com.google.common.util.concurrent.ThreadFactoryBuilder;
37 -
38 /** 36 /**
39 * ONOS Cluster messaging based Copycat protocol client. 37 * ONOS Cluster messaging based Copycat protocol client.
40 */ 38 */
...@@ -42,9 +40,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -42,9 +40,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
42 40
43 private final Logger log = getLogger(getClass()); 41 private final Logger log = getLogger(getClass());
44 42
45 - private static final ThreadFactory THREAD_FACTORY =
46 - new ThreadFactoryBuilder().setNameFormat("copycat-netty-messaging-%d").build();
47 -
48 public static final long RETRY_INTERVAL_MILLIS = 2000; 43 public static final long RETRY_INTERVAL_MILLIS = 2000;
49 44
50 private final ClusterService clusterService; 45 private final ClusterService clusterService;
...@@ -53,9 +48,9 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -53,9 +48,9 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
53 private final TcpMember remoteMember; 48 private final TcpMember remoteMember;
54 private ControllerNode remoteNode; 49 private ControllerNode remoteNode;
55 50
56 - // FIXME: Thread pool sizing. 51 + // TODO: make this non-static and stop on close
57 - private static final ScheduledExecutorService THREAD_POOL = 52 + private static final ExecutorService THREAD_POOL
58 - new ScheduledThreadPoolExecutor(10, THREAD_FACTORY); 53 + = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
59 54
60 private volatile CompletableFuture<Void> appeared; 55 private volatile CompletableFuture<Void> appeared;
61 56
...@@ -173,7 +168,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -173,7 +168,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
173 168
174 private <I, O> CompletableFuture<O> requestReply(I request) { 169 private <I, O> CompletableFuture<O> requestReply(I request) {
175 CompletableFuture<O> future = new CompletableFuture<>(); 170 CompletableFuture<O> future = new CompletableFuture<>();
176 - THREAD_POOL.schedule(new RPCTask<I, O>(request, future), 0, TimeUnit.MILLISECONDS); 171 + THREAD_POOL.submit(new RPCTask<I, O>(request, future));
177 return future; 172 return future;
178 } 173 }
179 174
...@@ -198,7 +193,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -198,7 +193,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
198 public void event(ClusterEvent event) { 193 public void event(ClusterEvent event) {
199 checkIfMemberAppeared(); 194 checkIfMemberAppeared();
200 } 195 }
201 -
202 } 196 }
203 197
204 private class RPCTask<I, O> implements Runnable { 198 private class RPCTask<I, O> implements Runnable {
...@@ -225,9 +219,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -225,9 +219,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
225 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); 219 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
226 future.complete(verifyNotNull(SERIALIZER.decode(response))); 220 future.complete(verifyNotNull(SERIALIZER.decode(response)));
227 221
228 - } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { 222 + } catch (IOException | ExecutionException | TimeoutException e) {
229 log.warn("RPCTask for {} failed.", request, e); 223 log.warn("RPCTask for {} failed.", request, e);
230 future.completeExceptionally(e); 224 future.completeExceptionally(e);
225 + } catch (InterruptedException e) {
226 + log.warn("RPCTask for {} was interrupted.", request, e);
227 + future.completeExceptionally(e);
228 + Thread.currentThread().interrupt();
231 } catch (Exception e) { 229 } catch (Exception e) {
232 log.warn("RPCTask for {} terribly failed.", request, e); 230 log.warn("RPCTask for {} terribly failed.", request, e);
233 future.completeExceptionally(e); 231 future.completeExceptionally(e);
......