Yuta HIGUCHI

ClusterMessagingProtocolClient: lazily bind to NodeId

fixes ONOS-185

Change-Id: Ibbe9624509964d7c3e7ac2c95c171e5cb20b0634
1 package org.onlab.onos.store.service.impl; 1 package org.onlab.onos.store.service.impl;
2 2
3 -import static com.google.common.base.Preconditions.checkNotNull;
4 import static org.slf4j.LoggerFactory.getLogger; 3 import static org.slf4j.LoggerFactory.getLogger;
5 4
6 import java.util.ArrayList; 5 import java.util.ArrayList;
...@@ -38,7 +37,6 @@ import org.apache.felix.scr.annotations.Reference; ...@@ -38,7 +37,6 @@ import org.apache.felix.scr.annotations.Reference;
38 import org.apache.felix.scr.annotations.ReferenceCardinality; 37 import org.apache.felix.scr.annotations.ReferenceCardinality;
39 import org.apache.felix.scr.annotations.Service; 38 import org.apache.felix.scr.annotations.Service;
40 import org.onlab.onos.cluster.ClusterService; 39 import org.onlab.onos.cluster.ClusterService;
41 -import org.onlab.onos.cluster.ControllerNode;
42 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 40 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
43 import org.onlab.onos.store.cluster.messaging.MessageSubject; 41 import org.onlab.onos.store.cluster.messaging.MessageSubject;
44 import org.onlab.onos.store.serializers.ImmutableListSerializer; 42 import org.onlab.onos.store.serializers.ImmutableListSerializer;
...@@ -172,20 +170,9 @@ public class ClusterMessagingProtocol ...@@ -172,20 +170,9 @@ public class ClusterMessagingProtocol
172 170
173 @Override 171 @Override
174 public ProtocolClient createClient(TcpMember member) { 172 public ProtocolClient createClient(TcpMember member) {
175 - ControllerNode remoteNode = getControllerNode(member.host(), member.port()); 173 + return new ClusterMessagingProtocolClient(clusterService,
176 - checkNotNull(remoteNode, 174 + clusterCommunicator,
177 - "No matching ONOS Node for %s:%s", 175 + clusterService.getLocalNode(),
178 - member.host(), member.port()); 176 + member);
179 - return new ClusterMessagingProtocolClient(
180 - clusterCommunicator, clusterService.getLocalNode(), remoteNode);
181 - }
182 -
183 - private ControllerNode getControllerNode(String host, int port) {
184 - for (ControllerNode node : clusterService.getNodes()) {
185 - if (node.ip().toString().equals(host) && node.tcpPort() == port) {
186 - return node;
187 - }
188 - }
189 - return null;
190 } 177 }
191 } 178 }
......
...@@ -13,6 +13,7 @@ import java.util.concurrent.ThreadFactory; ...@@ -13,6 +13,7 @@ import java.util.concurrent.ThreadFactory;
13 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
15 15
16 +import net.kuujo.copycat.cluster.TcpMember;
16 import net.kuujo.copycat.protocol.PingRequest; 17 import net.kuujo.copycat.protocol.PingRequest;
17 import net.kuujo.copycat.protocol.PingResponse; 18 import net.kuujo.copycat.protocol.PingResponse;
18 import net.kuujo.copycat.protocol.PollRequest; 19 import net.kuujo.copycat.protocol.PollRequest;
...@@ -23,6 +24,9 @@ import net.kuujo.copycat.protocol.SyncRequest; ...@@ -23,6 +24,9 @@ import net.kuujo.copycat.protocol.SyncRequest;
23 import net.kuujo.copycat.protocol.SyncResponse; 24 import net.kuujo.copycat.protocol.SyncResponse;
24 import net.kuujo.copycat.spi.protocol.ProtocolClient; 25 import net.kuujo.copycat.spi.protocol.ProtocolClient;
25 26
27 +import org.onlab.onos.cluster.ClusterEvent;
28 +import org.onlab.onos.cluster.ClusterEventListener;
29 +import org.onlab.onos.cluster.ClusterService;
26 import org.onlab.onos.cluster.ControllerNode; 30 import org.onlab.onos.cluster.ControllerNode;
27 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 31 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
28 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 32 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
...@@ -43,21 +47,30 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -43,21 +47,30 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
43 47
44 public static final long RETRY_INTERVAL_MILLIS = 2000; 48 public static final long RETRY_INTERVAL_MILLIS = 2000;
45 49
50 + private final ClusterService clusterService;
46 private final ClusterCommunicationService clusterCommunicator; 51 private final ClusterCommunicationService clusterCommunicator;
47 private final ControllerNode localNode; 52 private final ControllerNode localNode;
48 - private final ControllerNode remoteNode; 53 + private final TcpMember remoteMember;
54 + private ControllerNode remoteNode;
49 55
50 // FIXME: Thread pool sizing. 56 // FIXME: Thread pool sizing.
51 private static final ScheduledExecutorService THREAD_POOL = 57 private static final ScheduledExecutorService THREAD_POOL =
52 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY); 58 new ScheduledThreadPoolExecutor(10, THREAD_FACTORY);
53 59
60 + private volatile CompletableFuture<Void> appeared;
61 +
62 + private volatile InternalClusterEventListener listener;
63 +
54 public ClusterMessagingProtocolClient( 64 public ClusterMessagingProtocolClient(
65 + ClusterService clusterService,
55 ClusterCommunicationService clusterCommunicator, 66 ClusterCommunicationService clusterCommunicator,
56 ControllerNode localNode, 67 ControllerNode localNode,
57 - ControllerNode remoteNode) { 68 + TcpMember remoteMember) {
69 +
70 + this.clusterService = clusterService;
58 this.clusterCommunicator = clusterCommunicator; 71 this.clusterCommunicator = clusterCommunicator;
59 this.localNode = localNode; 72 this.localNode = localNode;
60 - this.remoteNode = remoteNode; 73 + this.remoteMember = remoteMember;
61 } 74 }
62 75
63 @Override 76 @Override
...@@ -81,15 +94,64 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -81,15 +94,64 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
81 } 94 }
82 95
83 @Override 96 @Override
84 - public CompletableFuture<Void> connect() { 97 + public synchronized CompletableFuture<Void> connect() {
85 - return CompletableFuture.completedFuture(null); 98 + if (remoteNode != null) {
99 + // done
100 + return CompletableFuture.completedFuture(null);
101 + }
102 +
103 + remoteNode = getControllerNode(remoteMember);
104 +
105 + if (remoteNode != null) {
106 + // done
107 + return CompletableFuture.completedFuture(null);
108 + }
109 +
110 + if (appeared != null) {
111 + // already waiting for member to appear
112 + return appeared;
113 + }
114 +
115 + appeared = new CompletableFuture<>();
116 + listener = new InternalClusterEventListener();
117 + clusterService.addListener(listener);
118 +
119 + // wait for specified controller node to come up
120 + return null;
86 } 121 }
87 122
88 @Override 123 @Override
89 - public CompletableFuture<Void> close() { 124 + public synchronized CompletableFuture<Void> close() {
125 + if (listener != null) {
126 + clusterService.removeListener(listener);
127 + listener = null;
128 + }
129 + if (appeared != null) {
130 + appeared.cancel(true);
131 + appeared = null;
132 + }
90 return CompletableFuture.completedFuture(null); 133 return CompletableFuture.completedFuture(null);
91 } 134 }
92 135
136 + private synchronized void checkIfMemberAppeared() {
137 + final ControllerNode controllerNode = getControllerNode(remoteMember);
138 + if (controllerNode == null) {
139 + // still not there: no-op
140 + return;
141 + }
142 +
143 + // found
144 + remoteNode = controllerNode;
145 + if (appeared != null) {
146 + appeared.complete(null);
147 + }
148 +
149 + if (listener != null) {
150 + clusterService.removeListener(listener);
151 + listener = null;
152 + }
153 + }
154 +
93 private <I> MessageSubject messageType(I input) { 155 private <I> MessageSubject messageType(I input) {
94 Class<?> clazz = input.getClass(); 156 Class<?> clazz = input.getClass();
95 if (clazz.equals(PollRequest.class)) { 157 if (clazz.equals(PollRequest.class)) {
...@@ -112,6 +174,30 @@ public class ClusterMessagingProtocolClient implements ProtocolClient { ...@@ -112,6 +174,30 @@ public class ClusterMessagingProtocolClient implements ProtocolClient {
112 return future; 174 return future;
113 } 175 }
114 176
177 + private ControllerNode getControllerNode(TcpMember remoteMember) {
178 + final String host = remoteMember.host();
179 + final int port = remoteMember.port();
180 + for (ControllerNode node : clusterService.getNodes()) {
181 + if (node.ip().toString().equals(host) && node.tcpPort() == port) {
182 + return node;
183 + }
184 + }
185 + return null;
186 + }
187 +
188 + private final class InternalClusterEventListener
189 + implements ClusterEventListener {
190 +
191 + public InternalClusterEventListener() {
192 + }
193 +
194 + @Override
195 + public void event(ClusterEvent event) {
196 + checkIfMemberAppeared();
197 + }
198 +
199 + }
200 +
115 private class RPCTask<I, O> implements Runnable { 201 private class RPCTask<I, O> implements Runnable {
116 202
117 private final I request; 203 private final I request;
......