Yuta HIGUCHI

ClusterMessagingProtocolServer: start listening at correct timing

Change-Id: Ie8ed1894ae16c41242aee861440174f011dd689b
...@@ -73,4 +73,12 @@ public interface ClusterCommunicationService { ...@@ -73,4 +73,12 @@ public interface ClusterCommunicationService {
73 * @param subscriber message subscriber 73 * @param subscriber message subscriber
74 */ 74 */
75 void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber); 75 void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
76 +
77 + /**
78 + * Removes a subscriber for the specified message subject.
79 + *
80 + * @param subject message subject
81 + */
82 + void removeSubscriber(MessageSubject subject);
83 +
76 } 84 }
......
...@@ -166,10 +166,15 @@ public class ClusterCommunicationManager ...@@ -166,10 +166,15 @@ public class ClusterCommunicationManager
166 166
167 @Override 167 @Override
168 public void addSubscriber(MessageSubject subject, 168 public void addSubscriber(MessageSubject subject,
169 - ClusterMessageHandler subscriber) { 169 + ClusterMessageHandler subscriber) {
170 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber)); 170 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
171 } 171 }
172 172
173 + @Override
174 + public void removeSubscriber(MessageSubject subject) {
175 + messagingService.unregisterHandler(subject.value());
176 + }
177 +
173 private final class InternalClusterMessageHandler implements MessageHandler { 178 private final class InternalClusterMessageHandler implements MessageHandler {
174 179
175 private final ClusterMessageHandler handler; 180 private final ClusterMessageHandler handler;
......
...@@ -27,18 +27,12 @@ import org.slf4j.Logger; ...@@ -27,18 +27,12 @@ import org.slf4j.Logger;
27 public class ClusterMessagingProtocolServer implements ProtocolServer { 27 public class ClusterMessagingProtocolServer implements ProtocolServer {
28 28
29 private final Logger log = getLogger(getClass()); 29 private final Logger log = getLogger(getClass());
30 - private RequestHandler handler; 30 + private volatile RequestHandler handler;
31 + private ClusterCommunicationService clusterCommunicator;
31 32
32 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) { 33 public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
34 + this.clusterCommunicator = clusterCommunicator;
33 35
34 - clusterCommunicator.addSubscriber(
35 - ClusterMessagingProtocol.COPYCAT_PING, new CopycatMessageHandler<PingRequest>());
36 - clusterCommunicator.addSubscriber(
37 - ClusterMessagingProtocol.COPYCAT_SYNC, new CopycatMessageHandler<SyncRequest>());
38 - clusterCommunicator.addSubscriber(
39 - ClusterMessagingProtocol.COPYCAT_POLL, new CopycatMessageHandler<PollRequest>());
40 - clusterCommunicator.addSubscriber(
41 - ClusterMessagingProtocol.COPYCAT_SUBMIT, new CopycatMessageHandler<SubmitRequest>());
42 } 36 }
43 37
44 @Override 38 @Override
...@@ -48,11 +42,23 @@ public class ClusterMessagingProtocolServer implements ProtocolServer { ...@@ -48,11 +42,23 @@ public class ClusterMessagingProtocolServer implements ProtocolServer {
48 42
49 @Override 43 @Override
50 public CompletableFuture<Void> listen() { 44 public CompletableFuture<Void> listen() {
45 + clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_PING,
46 + new CopycatMessageHandler<PingRequest>());
47 + clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC,
48 + new CopycatMessageHandler<SyncRequest>());
49 + clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_POLL,
50 + new CopycatMessageHandler<PollRequest>());
51 + clusterCommunicator.addSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT,
52 + new CopycatMessageHandler<SubmitRequest>());
51 return CompletableFuture.completedFuture(null); 53 return CompletableFuture.completedFuture(null);
52 } 54 }
53 55
54 @Override 56 @Override
55 public CompletableFuture<Void> close() { 57 public CompletableFuture<Void> close() {
58 + clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_PING);
59 + clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SYNC);
60 + clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_POLL);
61 + clusterCommunicator.removeSubscriber(ClusterMessagingProtocol.COPYCAT_SUBMIT);
56 return CompletableFuture.completedFuture(null); 62 return CompletableFuture.completedFuture(null);
57 } 63 }
58 64
......