Madan Jampani
Committed by Gerrit Code Review

Use a ScheduleExecutorService in CopycatTransportService instead of creating a n…

…ew thread per connection

Change-Id: Ic075209093e89e2502fb750d1a79509d6fcccc19
...@@ -23,6 +23,8 @@ import java.io.DataInputStream; ...@@ -23,6 +23,8 @@ import java.io.DataInputStream;
23 import java.io.IOException; 23 import java.io.IOException;
24 import java.util.Map; 24 import java.util.Map;
25 import java.util.concurrent.CompletableFuture; 25 import java.util.concurrent.CompletableFuture;
26 +import java.util.concurrent.Executors;
27 +import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.function.Consumer; 29 import java.util.function.Consumer;
28 30
...@@ -37,6 +39,7 @@ import com.google.common.collect.Maps; ...@@ -37,6 +39,7 @@ import com.google.common.collect.Maps;
37 import io.atomix.catalyst.transport.Address; 39 import io.atomix.catalyst.transport.Address;
38 import io.atomix.catalyst.transport.Connection; 40 import io.atomix.catalyst.transport.Connection;
39 import io.atomix.catalyst.transport.Server; 41 import io.atomix.catalyst.transport.Server;
42 +import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
40 import io.atomix.catalyst.util.concurrent.SingleThreadContext; 43 import io.atomix.catalyst.util.concurrent.SingleThreadContext;
41 import io.atomix.catalyst.util.concurrent.ThreadContext; 44 import io.atomix.catalyst.util.concurrent.ThreadContext;
42 45
...@@ -48,6 +51,7 @@ public class CopycatTransportServer implements Server { ...@@ -48,6 +51,7 @@ public class CopycatTransportServer implements Server {
48 private final Logger log = getLogger(getClass()); 51 private final Logger log = getLogger(getClass());
49 private final AtomicBoolean listening = new AtomicBoolean(false); 52 private final AtomicBoolean listening = new AtomicBoolean(false);
50 private CompletableFuture<Void> listenFuture = new CompletableFuture<>(); 53 private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
54 + private final ScheduledExecutorService executorService;
51 private final PartitionId partitionId; 55 private final PartitionId partitionId;
52 private final MessagingService messagingService; 56 private final MessagingService messagingService;
53 private final String messageSubject; 57 private final String messageSubject;
...@@ -57,6 +61,8 @@ public class CopycatTransportServer implements Server { ...@@ -57,6 +61,8 @@ public class CopycatTransportServer implements Server {
57 this.partitionId = checkNotNull(partitionId); 61 this.partitionId = checkNotNull(partitionId);
58 this.messagingService = checkNotNull(messagingService); 62 this.messagingService = checkNotNull(messagingService);
59 this.messageSubject = String.format("onos-copycat-%s", partitionId); 63 this.messageSubject = String.format("onos-copycat-%s", partitionId);
64 + this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),
65 + new CatalystThreadFactory("copycat-server-p" + partitionId + "-%d"));
60 } 66 }
61 67
62 @Override 68 @Override
...@@ -105,6 +111,7 @@ public class CopycatTransportServer implements Server { ...@@ -105,6 +111,7 @@ public class CopycatTransportServer implements Server {
105 @Override 111 @Override
106 public CompletableFuture<Void> close() { 112 public CompletableFuture<Void> close() {
107 messagingService.unregisterHandler(messageSubject); 113 messagingService.unregisterHandler(messageSubject);
114 + executorService.shutdown();
108 return CompletableFuture.completedFuture(null); 115 return CompletableFuture.completedFuture(null);
109 } 116 }
110 117
...@@ -116,6 +123,6 @@ public class CopycatTransportServer implements Server { ...@@ -116,6 +123,6 @@ public class CopycatTransportServer implements Server {
116 if (context != null) { 123 if (context != null) {
117 return context; 124 return context;
118 } 125 }
119 - return new SingleThreadContext("copycat-transport-server-" + partitionId, parentContext.serializer().clone()); 126 + return new SingleThreadContext(executorService, parentContext.serializer().clone());
120 } 127 }
121 } 128 }
......