Showing
2 changed files
with
10 additions
and
17 deletions
... | @@ -168,10 +168,10 @@ public class ClusterMessagingProtocol | ... | @@ -168,10 +168,10 @@ public class ClusterMessagingProtocol |
168 | 168 | ||
169 | @Override | 169 | @Override |
170 | public ProtocolClient createClient(TcpMember member) { | 170 | public ProtocolClient createClient(TcpMember member) { |
171 | - ControllerNode node = getControllerNode(member.host(), member.port()); | 171 | + ControllerNode remoteNode = getControllerNode(member.host(), member.port()); |
172 | - checkNotNull(node, "A valid controller node is expected"); | 172 | + checkNotNull(remoteNode, "A valid controller node is expected"); |
173 | return new ClusterMessagingProtocolClient( | 173 | return new ClusterMessagingProtocolClient( |
174 | - clusterCommunicator, node); | 174 | + clusterCommunicator, clusterService.getLocalNode(), remoteNode); |
175 | } | 175 | } |
176 | 176 | ||
177 | private ControllerNode getControllerNode(String host, int port) { | 177 | private ControllerNode getControllerNode(String host, int port) { | ... | ... |
... | @@ -42,6 +42,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { | ... | @@ -42,6 +42,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { |
42 | public static final long RETRY_INTERVAL_MILLIS = 2000; | 42 | public static final long RETRY_INTERVAL_MILLIS = 2000; |
43 | 43 | ||
44 | private final ClusterCommunicationService clusterCommunicator; | 44 | private final ClusterCommunicationService clusterCommunicator; |
45 | + private final ControllerNode localNode; | ||
45 | private final ControllerNode remoteNode; | 46 | private final ControllerNode remoteNode; |
46 | 47 | ||
47 | // FIXME: Thread pool sizing. | 48 | // FIXME: Thread pool sizing. |
... | @@ -50,8 +51,10 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { | ... | @@ -50,8 +51,10 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { |
50 | 51 | ||
51 | public ClusterMessagingProtocolClient( | 52 | public ClusterMessagingProtocolClient( |
52 | ClusterCommunicationService clusterCommunicator, | 53 | ClusterCommunicationService clusterCommunicator, |
54 | + ControllerNode localNode, | ||
53 | ControllerNode remoteNode) { | 55 | ControllerNode remoteNode) { |
54 | this.clusterCommunicator = clusterCommunicator; | 56 | this.clusterCommunicator = clusterCommunicator; |
57 | + this.localNode = localNode; | ||
55 | this.remoteNode = remoteNode; | 58 | this.remoteNode = remoteNode; |
56 | } | 59 | } |
57 | 60 | ||
... | @@ -117,7 +120,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { | ... | @@ -117,7 +120,7 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { |
117 | this.request = request; | 120 | this.request = request; |
118 | this.message = | 121 | this.message = |
119 | new ClusterMessage( | 122 | new ClusterMessage( |
120 | - null, // FIXME fill in proper sender | 123 | + localNode.id(), |
121 | messageType(request), | 124 | messageType(request), |
122 | ClusterMessagingProtocol.SERIALIZER.encode(request)); | 125 | ClusterMessagingProtocol.SERIALIZER.encode(request)); |
123 | this.future = future; | 126 | this.future = future; |
... | @@ -132,22 +135,12 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { | ... | @@ -132,22 +135,12 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { |
132 | future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response)); | 135 | future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response)); |
133 | 136 | ||
134 | } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { | 137 | } catch (IOException | InterruptedException | ExecutionException | TimeoutException e) { |
135 | -// if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) || | 138 | + log.warn("RPCTask for {} failed.", request, e); |
136 | -// message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) { | 139 | + future.completeExceptionally(e); |
137 | -// log.warn("{} Request to {} failed. Will retry in {} ms", | ||
138 | -// message.subject(), remoteNode, RETRY_INTERVAL_MILLIS); | ||
139 | -// THREAD_POOL.schedule( | ||
140 | -// this, | ||
141 | -// RETRY_INTERVAL_MILLIS, | ||
142 | -// TimeUnit.MILLISECONDS); | ||
143 | -// } else { | ||
144 | - log.warn("RPCTask for {} failed.", request, e); | ||
145 | - future.completeExceptionally(e); | ||
146 | -// } | ||
147 | } catch (Exception e) { | 140 | } catch (Exception e) { |
148 | log.warn("RPCTask for {} terribly failed.", request, e); | 141 | log.warn("RPCTask for {} terribly failed.", request, e); |
149 | future.completeExceptionally(e); | 142 | future.completeExceptionally(e); |
150 | } | 143 | } |
151 | } | 144 | } |
152 | } | 145 | } |
153 | -} | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
146 | +} | ... | ... |
-
Please register or login to post a comment