Madan Jampani
Committed by Gerrit Code Review

use maven shade plugin version defined in root pom + Minor fixes to CopycatTrans…

…port based on review comments

Change-Id: Iac2bd2e7eca99208930eb319e2f3996fbf043f88
...@@ -62,7 +62,7 @@ public class CopycatTransportConnection implements Connection { ...@@ -62,7 +62,7 @@ public class CopycatTransportConnection implements Connection {
62 static final byte FAILURE = 0x04; 62 static final byte FAILURE = 0x04;
63 63
64 private final long connectionId; 64 private final long connectionId;
65 - private CopycatTransport.Mode mode; 65 + private final CopycatTransport.Mode mode;
66 private final Address remoteAddress; 66 private final Address remoteAddress;
67 private final MessagingService messagingService; 67 private final MessagingService messagingService;
68 private final String outboundMessageSubject; 68 private final String outboundMessageSubject;
...@@ -73,6 +73,7 @@ public class CopycatTransportConnection implements Connection { ...@@ -73,6 +73,7 @@ public class CopycatTransportConnection implements Connection {
73 private final AtomicInteger sendFailures = new AtomicInteger(0); 73 private final AtomicInteger sendFailures = new AtomicInteger(0);
74 private final AtomicInteger messagesReceived = new AtomicInteger(0); 74 private final AtomicInteger messagesReceived = new AtomicInteger(0);
75 private final AtomicInteger receiveFailures = new AtomicInteger(0); 75 private final AtomicInteger receiveFailures = new AtomicInteger(0);
76 + private final Map<Address, Endpoint> endpointLookupCache = Maps.newConcurrentMap();
76 77
77 CopycatTransportConnection(long connectionId, 78 CopycatTransportConnection(long connectionId,
78 CopycatTransport.Mode mode, 79 CopycatTransport.Mode mode,
...@@ -206,7 +207,6 @@ public class CopycatTransportConnection implements Connection { ...@@ -206,7 +207,6 @@ public class CopycatTransportConnection implements Connection {
206 207
207 @Override 208 @Override
208 public CompletableFuture<Void> close() { 209 public CompletableFuture<Void> close() {
209 - // TODO: need to unregister message handler
210 closeListeners.forEach(listener -> listener.accept(this)); 210 closeListeners.forEach(listener -> listener.accept(this));
211 if (mode == CopycatTransport.Mode.CLIENT) { 211 if (mode == CopycatTransport.Mode.CLIENT) {
212 messagingService.unregisterHandler(inboundMessageSubject); 212 messagingService.unregisterHandler(inboundMessageSubject);
...@@ -240,12 +240,14 @@ public class CopycatTransportConnection implements Connection { ...@@ -240,12 +240,14 @@ public class CopycatTransportConnection implements Connection {
240 } 240 }
241 241
242 private Endpoint toEndpoint(Address address) { 242 private Endpoint toEndpoint(Address address) {
243 + return endpointLookupCache.computeIfAbsent(address, a -> {
243 try { 244 try {
244 - return new Endpoint(IpAddress.valueOf(InetAddress.getByName(address.host())), address.port()); 245 + return new Endpoint(IpAddress.valueOf(InetAddress.getByName(a.host())), a.port());
245 } catch (UnknownHostException e) { 246 } catch (UnknownHostException e) {
246 Throwables.propagate(e); 247 Throwables.propagate(e);
247 return null; 248 return null;
248 } 249 }
250 + });
249 } 251 }
250 252
251 @SuppressWarnings("rawtypes") 253 @SuppressWarnings("rawtypes")
......
...@@ -45,7 +45,7 @@ import io.atomix.catalyst.util.concurrent.ThreadContext; ...@@ -45,7 +45,7 @@ import io.atomix.catalyst.util.concurrent.ThreadContext;
45 public class CopycatTransportServer implements Server { 45 public class CopycatTransportServer implements Server {
46 46
47 private final AtomicBoolean listening = new AtomicBoolean(false); 47 private final AtomicBoolean listening = new AtomicBoolean(false);
48 - private CompletableFuture<Void> listenFuture; 48 + private CompletableFuture<Void> listenFuture = new CompletableFuture<>();
49 private final String clusterName; 49 private final String clusterName;
50 private final MessagingService messagingService; 50 private final MessagingService messagingService;
51 private final String messageSubject; 51 private final String messageSubject;
...@@ -59,20 +59,14 @@ public class CopycatTransportServer implements Server { ...@@ -59,20 +59,14 @@ public class CopycatTransportServer implements Server {
59 59
60 @Override 60 @Override
61 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) { 61 public CompletableFuture<Void> listen(Address address, Consumer<Connection> listener) {
62 - if (listening.get()) { 62 + if (listening.compareAndSet(false, true)) {
63 - return CompletableFuture.completedFuture(null);
64 - }
65 ThreadContext context = ThreadContext.currentContextOrThrow(); 63 ThreadContext context = ThreadContext.currentContextOrThrow();
66 - synchronized (this) {
67 - if (listenFuture == null) {
68 - listenFuture = new CompletableFuture<>();
69 listen(address, listener, context); 64 listen(address, listener, context);
70 } 65 }
71 - }
72 return listenFuture; 66 return listenFuture;
73 } 67 }
74 68
75 - public void listen(Address address, Consumer<Connection> listener, ThreadContext context) { 69 + private void listen(Address address, Consumer<Connection> listener, ThreadContext context) {
76 messagingService.registerHandler(messageSubject, (sender, payload) -> { 70 messagingService.registerHandler(messageSubject, (sender, payload) -> {
77 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) { 71 try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(payload))) {
78 long connectionId = input.readLong(); 72 long connectionId = input.readLong();
...@@ -101,7 +95,6 @@ public class CopycatTransportServer implements Server { ...@@ -101,7 +95,6 @@ public class CopycatTransportServer implements Server {
101 return Tools.exceptionalFuture(e); 95 return Tools.exceptionalFuture(e);
102 } 96 }
103 }); 97 });
104 - listening.set(true);
105 context.execute(() -> { 98 context.execute(() -> {
106 listenFuture.complete(null); 99 listenFuture.complete(null);
107 }); 100 });
......
...@@ -69,7 +69,6 @@ ...@@ -69,7 +69,6 @@
69 <plugin> 69 <plugin>
70 <groupId>org.apache.maven.plugins</groupId> 70 <groupId>org.apache.maven.plugins</groupId>
71 <artifactId>maven-shade-plugin</artifactId> 71 <artifactId>maven-shade-plugin</artifactId>
72 - <version>2.4.1</version>
73 <configuration> 72 <configuration>
74 <createSourcesJar>true</createSourcesJar> 73 <createSourcesJar>true</createSourcesJar>
75 74
......