DatabaseManager: try to wait for others on start up
Change-Id: I90acfa10be7430509a459b456658dc8838d4e44b
Showing
2 changed files
with
44 additions
and
10 deletions
... | @@ -149,12 +149,12 @@ public class ClusterMessagingProtocol | ... | @@ -149,12 +149,12 @@ public class ClusterMessagingProtocol |
149 | 149 | ||
150 | @Activate | 150 | @Activate |
151 | public void activate() { | 151 | public void activate() { |
152 | - log.info("Started."); | 152 | + log.info("Started"); |
153 | } | 153 | } |
154 | 154 | ||
155 | @Deactivate | 155 | @Deactivate |
156 | public void deactivate() { | 156 | public void deactivate() { |
157 | - log.info("Stopped."); | 157 | + log.info("Stopped"); |
158 | } | 158 | } |
159 | 159 | ||
160 | @Override | 160 | @Override | ... | ... |
... | @@ -5,6 +5,8 @@ import static org.slf4j.LoggerFactory.getLogger; | ... | @@ -5,6 +5,8 @@ import static org.slf4j.LoggerFactory.getLogger; |
5 | import java.util.ArrayList; | 5 | import java.util.ArrayList; |
6 | import java.util.Arrays; | 6 | import java.util.Arrays; |
7 | import java.util.List; | 7 | import java.util.List; |
8 | +import java.util.concurrent.CountDownLatch; | ||
9 | +import java.util.concurrent.TimeUnit; | ||
8 | 10 | ||
9 | import net.kuujo.copycat.Copycat; | 11 | import net.kuujo.copycat.Copycat; |
10 | import net.kuujo.copycat.StateMachine; | 12 | import net.kuujo.copycat.StateMachine; |
... | @@ -60,9 +62,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { | ... | @@ -60,9 +62,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { |
60 | private Copycat copycat; | 62 | private Copycat copycat; |
61 | private DatabaseClient client; | 63 | private DatabaseClient client; |
62 | 64 | ||
63 | - // TODO: check if synchronization is required to read/modify this | 65 | + // guarded by synchronized block |
64 | private ClusterConfig<TcpMember> clusterConfig; | 66 | private ClusterConfig<TcpMember> clusterConfig; |
65 | 67 | ||
68 | + private CountDownLatch clusterEventLatch; | ||
69 | + | ||
66 | private ClusterEventListener clusterEventListener; | 70 | private ClusterEventListener clusterEventListener; |
67 | 71 | ||
68 | @Activate | 72 | @Activate |
... | @@ -81,22 +85,45 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { | ... | @@ -81,22 +85,45 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { |
81 | 85 | ||
82 | List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size()); | 86 | List<TcpMember> remoteMembers = new ArrayList<>(clusterService.getNodes().size()); |
83 | 87 | ||
88 | + clusterEventLatch = new CountDownLatch(1); | ||
84 | clusterEventListener = new InternalClusterEventListener(); | 89 | clusterEventListener = new InternalClusterEventListener(); |
85 | clusterService.addListener(clusterEventListener); | 90 | clusterService.addListener(clusterEventListener); |
86 | 91 | ||
92 | + // note: from this point beyond, clusterConfig requires synchronization | ||
93 | + | ||
87 | for (ControllerNode node : clusterService.getNodes()) { | 94 | for (ControllerNode node : clusterService.getNodes()) { |
88 | TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort()); | 95 | TcpMember member = new TcpMember(node.ip().toString(), node.tcpPort()); |
89 | if (!member.equals(localMember)) { | 96 | if (!member.equals(localMember)) { |
90 | remoteMembers.add(member); | 97 | remoteMembers.add(member); |
91 | } | 98 | } |
92 | } | 99 | } |
93 | - clusterConfig.addRemoteMembers(remoteMembers); | ||
94 | 100 | ||
95 | - log.info("Starting cluster with Local:[{}], Remote:{}", localMember, remoteMembers); | 101 | + if (remoteMembers.isEmpty()) { |
102 | + log.info("This node is the only node in the cluster. " | ||
103 | + + "Waiting for others to show up."); | ||
104 | + // FIXME: hack trying to relax cases forming multiple consensus rings. | ||
105 | + // add seed node configuration to avoid this | ||
106 | + | ||
107 | + // If the node is alone on it's own, wait some time | ||
108 | + // hoping other will come up soon | ||
109 | + try { | ||
110 | + if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) { | ||
111 | + log.info("Starting as single node cluster"); | ||
112 | + } | ||
113 | + } catch (InterruptedException e) { | ||
114 | + log.info("Interrupted waiting for others", e); | ||
115 | + } | ||
116 | + } | ||
96 | 117 | ||
118 | + final TcpCluster cluster; | ||
119 | + synchronized (clusterConfig) { | ||
120 | + clusterConfig.addRemoteMembers(remoteMembers); | ||
121 | + | ||
122 | + // Create the cluster. | ||
123 | + cluster = new TcpCluster(clusterConfig); | ||
124 | + } | ||
125 | + log.info("Starting cluster: {}", cluster); | ||
97 | 126 | ||
98 | - // Create the cluster. | ||
99 | - TcpCluster cluster = new TcpCluster(clusterConfig); | ||
100 | 127 | ||
101 | StateMachine stateMachine = new DatabaseStateMachine(); | 128 | StateMachine stateMachine = new DatabaseStateMachine(); |
102 | // FIXME resolve Chronicle + OSGi issue | 129 | // FIXME resolve Chronicle + OSGi issue |
... | @@ -207,17 +234,24 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { | ... | @@ -207,17 +234,24 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { |
207 | case INSTANCE_ACTIVATED: | 234 | case INSTANCE_ACTIVATED: |
208 | case INSTANCE_ADDED: | 235 | case INSTANCE_ADDED: |
209 | log.info("{} was added to the cluster", tcpMember); | 236 | log.info("{} was added to the cluster", tcpMember); |
210 | - clusterConfig.addRemoteMember(tcpMember); | 237 | + synchronized (clusterConfig) { |
238 | + clusterConfig.addRemoteMember(tcpMember); | ||
239 | + } | ||
211 | break; | 240 | break; |
212 | case INSTANCE_DEACTIVATED: | 241 | case INSTANCE_DEACTIVATED: |
213 | case INSTANCE_REMOVED: | 242 | case INSTANCE_REMOVED: |
214 | log.info("{} was removed from the cluster", tcpMember); | 243 | log.info("{} was removed from the cluster", tcpMember); |
215 | - clusterConfig.removeRemoteMember(tcpMember); | 244 | + synchronized (clusterConfig) { |
245 | + clusterConfig.removeRemoteMember(tcpMember); | ||
246 | + } | ||
216 | break; | 247 | break; |
217 | default: | 248 | default: |
218 | break; | 249 | break; |
219 | } | 250 | } |
220 | - log.debug("Current cluster: {}", copycat.cluster()); | 251 | + if (copycat != null) { |
252 | + log.debug("Current cluster: {}", copycat.cluster()); | ||
253 | + } | ||
254 | + clusterEventLatch.countDown(); | ||
221 | } | 255 | } |
222 | 256 | ||
223 | } | 257 | } | ... | ... |
-
Please register or login to post a comment