Madan Jampani
Committed by Yuta Higuchi

Updates to ClusterMessagigProtocolClient's handling of remote node connectivity issues.

Change-Id: If3cd220bef339cc57b2a5d034c6e86bad2202a9f
...@@ -12,6 +12,7 @@ import java.util.concurrent.ExecutorService; ...@@ -12,6 +12,7 @@ import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors; 12 import java.util.concurrent.Executors;
13 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
15 +import java.util.concurrent.atomic.AtomicBoolean;
15 16
16 import net.kuujo.copycat.cluster.TcpMember; 17 import net.kuujo.copycat.cluster.TcpMember;
17 import net.kuujo.copycat.protocol.PingRequest; 18 import net.kuujo.copycat.protocol.PingRequest;
...@@ -24,8 +25,6 @@ import net.kuujo.copycat.protocol.SyncRequest; ...@@ -24,8 +25,6 @@ import net.kuujo.copycat.protocol.SyncRequest;
24 import net.kuujo.copycat.protocol.SyncResponse; 25 import net.kuujo.copycat.protocol.SyncResponse;
25 import net.kuujo.copycat.spi.protocol.ProtocolClient; 26 import net.kuujo.copycat.spi.protocol.ProtocolClient;
26 27
27 -import org.onlab.onos.cluster.ClusterEvent;
28 -import org.onlab.onos.cluster.ClusterEventListener;
29 import org.onlab.onos.cluster.ClusterService; 28 import org.onlab.onos.cluster.ClusterService;
30 import org.onlab.onos.cluster.ControllerNode; 29 import org.onlab.onos.cluster.ControllerNode;
31 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 30 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
...@@ -47,17 +46,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -47,17 +46,13 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
47 private final ControllerNode localNode; 46 private final ControllerNode localNode;
48 private final TcpMember remoteMember; 47 private final TcpMember remoteMember;
49 48
50 - // (remoteNode == null) => disconnected state 49 + private ControllerNode remoteNode;
51 - private volatile ControllerNode remoteNode; 50 + private final AtomicBoolean connectionOK = new AtomicBoolean(true);
52 51
53 // TODO: make this non-static and stop on close 52 // TODO: make this non-static and stop on close
54 private static final ExecutorService THREAD_POOL 53 private static final ExecutorService THREAD_POOL
55 = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d")); 54 = Executors.newCachedThreadPool(namedThreads("copycat-netty-messaging-%d"));
56 55
57 - private volatile CompletableFuture<Void> appeared;
58 -
59 - private volatile InternalClusterEventListener listener;
60 -
61 public ClusterMessagingProtocolClient( 56 public ClusterMessagingProtocolClient(
62 ClusterService clusterService, 57 ClusterService clusterService,
63 ClusterCommunicationService clusterCommunicator, 58 ClusterCommunicationService clusterCommunicator,
...@@ -72,83 +67,34 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -72,83 +67,34 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
72 67
73 @Override 68 @Override
74 public CompletableFuture<PingResponse> ping(PingRequest request) { 69 public CompletableFuture<PingResponse> ping(PingRequest request) {
75 - return connect().thenCompose((connected) -> { return requestReply(request); }); 70 + return requestReply(request);
76 } 71 }
77 72
78 @Override 73 @Override
79 public CompletableFuture<SyncResponse> sync(SyncRequest request) { 74 public CompletableFuture<SyncResponse> sync(SyncRequest request) {
80 - return connect().thenCompose((connected) -> { return requestReply(request); }); 75 + return requestReply(request);
81 } 76 }
82 77
83 @Override 78 @Override
84 public CompletableFuture<PollResponse> poll(PollRequest request) { 79 public CompletableFuture<PollResponse> poll(PollRequest request) {
85 - return connect().thenCompose((connected) -> { return requestReply(request); }); 80 + return requestReply(request);
86 } 81 }
87 82
88 @Override 83 @Override
89 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) { 84 public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
90 - return connect().thenCompose((connected) -> { return requestReply(request); }); 85 + return requestReply(request);
91 } 86 }
92 87
93 @Override 88 @Override
94 public synchronized CompletableFuture<Void> connect() { 89 public synchronized CompletableFuture<Void> connect() {
95 - if (remoteNode != null) { 90 + return CompletableFuture.completedFuture(null);
96 - // done
97 - return CompletableFuture.completedFuture(null);
98 - }
99 -
100 - if (appeared != null) {
101 - // already waiting for member to appear
102 - return appeared;
103 - }
104 -
105 - appeared = new CompletableFuture<>();
106 - listener = new InternalClusterEventListener();
107 - clusterService.addListener(listener);
108 -
109 - remoteNode = getControllerNode(remoteMember);
110 -
111 - if (remoteNode != null) {
112 - // done
113 - return CompletableFuture.completedFuture(null);
114 - }
115 -
116 - // wait for specified controller node to come up
117 - return appeared;
118 } 91 }
119 92
120 @Override 93 @Override
121 public synchronized CompletableFuture<Void> close() { 94 public synchronized CompletableFuture<Void> close() {
122 - if (listener != null) {
123 - clusterService.removeListener(listener);
124 - listener = null;
125 - }
126 - if (appeared != null) {
127 - appeared.cancel(true);
128 - appeared = null;
129 - }
130 return CompletableFuture.completedFuture(null); 95 return CompletableFuture.completedFuture(null);
131 } 96 }
132 97
133 - private synchronized void checkIfMemberAppeared() {
134 - final ControllerNode controllerNode = getControllerNode(remoteMember);
135 - if (controllerNode == null) {
136 - // still not there: no-op
137 - return;
138 - }
139 -
140 - // found
141 - remoteNode = controllerNode;
142 - if (appeared != null) {
143 - appeared.complete(null);
144 - }
145 -
146 - if (listener != null) {
147 - clusterService.removeListener(listener);
148 - listener = null;
149 - }
150 - }
151 -
152 private <I> MessageSubject messageType(I input) { 98 private <I> MessageSubject messageType(I input) {
153 Class<?> clazz = input.getClass(); 99 Class<?> clazz = input.getClass();
154 if (clazz.equals(PollRequest.class)) { 100 if (clazz.equals(PollRequest.class)) {
...@@ -162,7 +108,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -162,7 +108,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
162 } else { 108 } else {
163 throw new IllegalArgumentException("Unknown class " + clazz.getName()); 109 throw new IllegalArgumentException("Unknown class " + clazz.getName());
164 } 110 }
165 -
166 } 111 }
167 112
168 private <I, O> CompletableFuture<O> requestReply(I request) { 113 private <I, O> CompletableFuture<O> requestReply(I request) {
...@@ -182,18 +127,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -182,18 +127,6 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
182 return null; 127 return null;
183 } 128 }
184 129
185 - private final class InternalClusterEventListener
186 - implements ClusterEventListener {
187 -
188 - public InternalClusterEventListener() {
189 - }
190 -
191 - @Override
192 - public void event(ClusterEvent event) {
193 - checkIfMemberAppeared();
194 - }
195 - }
196 -
197 private class RPCTask<I, O> implements Runnable { 130 private class RPCTask<I, O> implements Runnable {
198 131
199 private final I request; 132 private final I request;
...@@ -213,22 +146,25 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -213,22 +146,25 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
213 @Override 146 @Override
214 public void run() { 147 public void run() {
215 try { 148 try {
216 - ControllerNode node = remoteNode; 149 + if (remoteNode == null) {
217 - if (node == null) { 150 + remoteNode = getControllerNode(remoteMember);
218 - throw new IOException("Remote node disappeared"); 151 + if (remoteNode == null) {
152 + throw new IOException("Remote node is offline!");
153 + }
219 } 154 }
220 byte[] response = clusterCommunicator 155 byte[] response = clusterCommunicator
221 - .sendAndReceive(message, node.id()) 156 + .sendAndReceive(message, remoteNode.id())
222 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); 157 .get(RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
158 + if (!connectionOK.getAndSet(true)) {
159 + log.info("Connectivity to {} restored", remoteNode);
160 + }
223 future.complete(verifyNotNull(SERIALIZER.decode(response))); 161 future.complete(verifyNotNull(SERIALIZER.decode(response)));
224 -
225 } catch (IOException | TimeoutException e) { 162 } catch (IOException | TimeoutException e) {
226 - log.warn("RPCTask for {} failed: {}", request, e.getMessage()); 163 + if (connectionOK.getAndSet(false)) {
164 + log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
165 + }
227 log.debug("RPCTask for {} failed.", request, e); 166 log.debug("RPCTask for {} failed.", request, e);
228 future.completeExceptionally(e); 167 future.completeExceptionally(e);
229 - // Treating this client as disconnected
230 - remoteNode = null;
231 - appeared = null;
232 } catch (ExecutionException e) { 168 } catch (ExecutionException e) {
233 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage()); 169 log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
234 log.debug("RPCTask execution for {} failed.", request, e); 170 log.debug("RPCTask execution for {} failed.", request, e);
......