Yuta HIGUCHI
Committed by Yuta Higuchi

ClusterMessagingProtocolClient: transition to not connected state on IO error

Change-Id: Iac0af5b5a55868d2677aecf18e63e00018d5113f
...@@ -12,7 +12,6 @@ import java.util.concurrent.ExecutorService; ...@@ -12,7 +12,6 @@ import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors; 12 import java.util.concurrent.Executors;
13 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
15 -
16 import net.kuujo.copycat.cluster.TcpMember; 15 import net.kuujo.copycat.cluster.TcpMember;
17 import net.kuujo.copycat.protocol.PingRequest; 16 import net.kuujo.copycat.protocol.PingRequest;
18 import net.kuujo.copycat.protocol.PingResponse; 17 import net.kuujo.copycat.protocol.PingResponse;
...@@ -46,7 +45,9 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -46,7 +45,9 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
46 private final ClusterCommunicationService clusterCommunicator; 45 private final ClusterCommunicationService clusterCommunicator;
47 private final ControllerNode localNode; 46 private final ControllerNode localNode;
48 private final TcpMember remoteMember; 47 private final TcpMember remoteMember;
49 - private ControllerNode remoteNode; 48 +
49 + // (remoteNode == null) => disconnected state
50 + private volatile ControllerNode remoteNode;
50 51
51 // TODO: make this non-static and stop on close 52 // TODO: make this non-static and stop on close
52 private static final ExecutorService THREAD_POOL 53 private static final ExecutorService THREAD_POOL
...@@ -70,22 +71,22 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -70,22 +71,22 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
70 71
71 @Override 72 @Override
72 public CompletableFuture<PingResponse> ping(PingRequest request) { 73 public CompletableFuture<PingResponse> ping(PingRequest request) {
73 - return requestReply(request); 74 + return connect().thenCompose((connected) -> { return requestReply(request); });
74 } 75 }
75 76
76 @Override 77 @Override
77 public CompletableFuture<SyncResponse> sync(SyncRequest request) { 78 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
78 - return requestReply(request); 79 + return connect().thenCompose((connected) -> { return requestReply(request); });
79 } 80 }
80 81
81 @Override 82 @Override
82 public CompletableFuture<PollResponse> poll(PollRequest request) { 83 public CompletableFuture<PollResponse> poll(PollRequest request) {
83 - return requestReply(request); 84 + return connect().thenCompose((connected) -> { return requestReply(request); });
84 } 85 }
85 86
86 @Override 87 @Override
87 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) { 88 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
88 - return requestReply(request); 89 + return connect().thenCompose((connected) -> { return requestReply(request); });
89 } 90 }
90 91
91 @Override 92 @Override
...@@ -95,13 +96,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -95,13 +96,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
95 return CompletableFuture.completedFuture(null); 96 return CompletableFuture.completedFuture(null);
96 } 97 }
97 98
98 - remoteNode = getControllerNode(remoteMember);
99 -
100 - if (remoteNode != null) {
101 - // done
102 - return CompletableFuture.completedFuture(null);
103 - }
104 -
105 if (appeared != null) { 99 if (appeared != null) {
106 // already waiting for member to appear 100 // already waiting for member to appear
107 return appeared; 101 return appeared;
...@@ -111,6 +105,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -111,6 +105,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
111 listener = new InternalClusterEventListener(); 105 listener = new InternalClusterEventListener();
112 clusterService.addListener(listener); 106 clusterService.addListener(listener);
113 107
108 + remoteNode = getControllerNode(remoteMember);
109 +
110 + if (remoteNode != null) {
111 + // done
112 + return CompletableFuture.completedFuture(null);
113 + }
114 +
114 // wait for specified controller node to come up 115 // wait for specified controller node to come up
115 return appeared; 116 return appeared;
116 } 117 }
...@@ -211,15 +212,25 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -211,15 +212,25 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
211 @Override 212 @Override
212 public void run() { 213 public void run() {
213 try { 214 try {
215 + ControllerNode node = remoteNode;
216 + if (node == null) {
217 + throw new IOException("Remote node disappeared");
218 + }
214 byte[] response = clusterCommunicator 219 byte[] response = clusterCommunicator
215 - .sendAndReceive(message, remoteNode.id()) 220 + .sendAndReceive(message, node.id())
216 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); 221 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
217 future.complete(verifyNotNull(SERIALIZER.decode(response))); 222 future.complete(verifyNotNull(SERIALIZER.decode(response)));
218 223
219 - } catch (IOException | ExecutionException | TimeoutException e) { 224 + } catch (IOException | TimeoutException e) {
220 log.warn("RPCTask for {} failed: {}", request, e.getMessage()); 225 log.warn("RPCTask for {} failed: {}", request, e.getMessage());
221 log.debug("RPCTask for {} failed.", request, e); 226 log.debug("RPCTask for {} failed.", request, e);
222 future.completeExceptionally(e); 227 future.completeExceptionally(e);
228 + // Treating this client as disconnected
229 + remoteNode = null;
230 + } catch (ExecutionException e) {
231 + log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
232 + log.debug("RPCTask execution for {} failed.", request, e);
233 + future.completeExceptionally(e);
223 } catch (InterruptedException e) { 234 } catch (InterruptedException e) {
224 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage()); 235 log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
225 log.debug("RPCTask for {} was interrupted.", request, e); 236 log.debug("RPCTask for {} was interrupted.", request, e);
......