Madan Jampani
Committed by Yuta Higuchi

Support for running copycat on a subset of ONOS cluster nodes.

This change ensures DatabaseService on each node does the right thing, irrespective of whether the node is part of Raft cluster or not.

Change-Id: I1e8976d56b3a2892d5c7ecbb46c247770a633860
...@@ -13,6 +13,7 @@ import java.util.Vector; ...@@ -13,6 +13,7 @@ import java.util.Vector;
13 13
14 import net.kuujo.copycat.cluster.TcpClusterConfig; 14 import net.kuujo.copycat.cluster.TcpClusterConfig;
15 import net.kuujo.copycat.cluster.TcpMember; 15 import net.kuujo.copycat.cluster.TcpMember;
16 +import net.kuujo.copycat.event.LeaderElectEvent;
16 import net.kuujo.copycat.internal.log.ConfigurationEntry; 17 import net.kuujo.copycat.internal.log.ConfigurationEntry;
17 import net.kuujo.copycat.internal.log.CopycatEntry; 18 import net.kuujo.copycat.internal.log.CopycatEntry;
18 import net.kuujo.copycat.internal.log.OperationEntry; 19 import net.kuujo.copycat.internal.log.OperationEntry;
...@@ -103,6 +104,7 @@ public class ClusterMessagingProtocol ...@@ -103,6 +104,7 @@ public class ClusterMessagingProtocol
103 .register(OperationEntry.class) 104 .register(OperationEntry.class)
104 .register(TcpClusterConfig.class) 105 .register(TcpClusterConfig.class)
105 .register(TcpMember.class) 106 .register(TcpMember.class)
107 + .register(LeaderElectEvent.class)
106 .build(); 108 .build();
107 109
108 private static final KryoNamespace DATABASE = KryoNamespace.newBuilder() 110 private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
......
...@@ -13,13 +13,15 @@ import java.util.concurrent.ExecutionException; ...@@ -13,13 +13,15 @@ import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.TimeoutException; 14 import java.util.concurrent.TimeoutException;
15 15
16 +import net.kuujo.copycat.cluster.Member;
16 import net.kuujo.copycat.cluster.TcpMember; 17 import net.kuujo.copycat.cluster.TcpMember;
17 -import net.kuujo.copycat.event.EventHandler;
18 import net.kuujo.copycat.event.LeaderElectEvent; 18 import net.kuujo.copycat.event.LeaderElectEvent;
19 import net.kuujo.copycat.protocol.SubmitRequest; 19 import net.kuujo.copycat.protocol.SubmitRequest;
20 import net.kuujo.copycat.protocol.SubmitResponse; 20 import net.kuujo.copycat.protocol.SubmitResponse;
21 import net.kuujo.copycat.spi.protocol.ProtocolClient; 21 import net.kuujo.copycat.spi.protocol.ProtocolClient;
22 22
23 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24 +import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
23 import org.onlab.onos.store.service.BatchReadRequest; 25 import org.onlab.onos.store.service.BatchReadRequest;
24 import org.onlab.onos.store.service.BatchWriteRequest; 26 import org.onlab.onos.store.service.BatchWriteRequest;
25 import org.onlab.onos.store.service.DatabaseException; 27 import org.onlab.onos.store.service.DatabaseException;
...@@ -31,7 +33,7 @@ import org.slf4j.Logger; ...@@ -31,7 +33,7 @@ import org.slf4j.Logger;
31 /** 33 /**
32 * Client for interacting with the Copycat Raft cluster. 34 * Client for interacting with the Copycat Raft cluster.
33 */ 35 */
34 -public class DatabaseClient implements EventHandler<LeaderElectEvent> { 36 +public class DatabaseClient implements ClusterMessageHandler {
35 37
36 private static final int RETRIES = 5; 38 private static final int RETRIES = 5;
37 39
...@@ -41,24 +43,28 @@ public class DatabaseClient implements EventHandler<LeaderElectEvent> { ...@@ -41,24 +43,28 @@ public class DatabaseClient implements EventHandler<LeaderElectEvent> {
41 43
42 private final DatabaseProtocolService protocol; 44 private final DatabaseProtocolService protocol;
43 private volatile ProtocolClient client = null; 45 private volatile ProtocolClient client = null;
44 - private volatile TcpMember currentLeader = null; 46 + private volatile Member currentLeader = null;
45 - 47 + private volatile long currentLeaderTerm = 0;
46 48
47 public DatabaseClient(DatabaseProtocolService protocol) { 49 public DatabaseClient(DatabaseProtocolService protocol) {
48 this.protocol = checkNotNull(protocol); 50 this.protocol = checkNotNull(protocol);
49 } 51 }
50 52
51 - // FIXME This handler relies on a fact that local node is part of Raft cluster
52 @Override 53 @Override
53 - public void handle(LeaderElectEvent event) { 54 + public void handle(ClusterMessage message) {
54 - final TcpMember newLeader = event.leader(); 55 + LeaderElectEvent event =
55 - if (newLeader != null && !newLeader.equals(currentLeader)) { 56 + ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
56 - log.info("{} became the new leader", newLeader); 57 + TcpMember newLeader = event.leader();
58 + long newLeaderTerm = event.term();
59 + if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
60 + log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
57 ProtocolClient prevClient = client; 61 ProtocolClient prevClient = client;
58 - ProtocolClient newclient = protocol.createClient(newLeader); 62 + ProtocolClient newClient = protocol.createClient(newLeader);
59 - newclient.connect(); 63 + newClient.connect();
60 - client = newclient; 64 + client = newClient;
61 currentLeader = newLeader; 65 currentLeader = newLeader;
66 + currentLeaderTerm = newLeaderTerm;
67 +
62 if (prevClient != null) { 68 if (prevClient != null) {
63 prevClient.close(); 69 prevClient.close();
64 } 70 }
...@@ -80,7 +86,6 @@ public class DatabaseClient implements EventHandler<LeaderElectEvent> { ...@@ -80,7 +86,6 @@ public class DatabaseClient implements EventHandler<LeaderElectEvent> {
80 while (currentLeader == null) { 86 while (currentLeader == null) {
81 Thread.sleep(200); 87 Thread.sleep(200);
82 } 88 }
83 - log.info("Leader appeared: {}", currentLeader);
84 return; 89 return;
85 } catch (InterruptedException e) { 90 } catch (InterruptedException e) {
86 log.error("Interrupted while waiting for Leader", e); 91 log.error("Interrupted while waiting for Leader", e);
......
...@@ -19,6 +19,7 @@ import net.kuujo.copycat.cluster.Member; ...@@ -19,6 +19,7 @@ import net.kuujo.copycat.cluster.Member;
19 import net.kuujo.copycat.cluster.TcpCluster; 19 import net.kuujo.copycat.cluster.TcpCluster;
20 import net.kuujo.copycat.cluster.TcpClusterConfig; 20 import net.kuujo.copycat.cluster.TcpClusterConfig;
21 import net.kuujo.copycat.cluster.TcpMember; 21 import net.kuujo.copycat.cluster.TcpMember;
22 +import net.kuujo.copycat.event.EventHandler;
22 import net.kuujo.copycat.event.LeaderElectEvent; 23 import net.kuujo.copycat.event.LeaderElectEvent;
23 import net.kuujo.copycat.log.Log; 24 import net.kuujo.copycat.log.Log;
24 25
...@@ -35,6 +36,8 @@ import org.onlab.onos.cluster.ControllerNode; ...@@ -35,6 +36,8 @@ import org.onlab.onos.cluster.ControllerNode;
35 import org.onlab.onos.cluster.DefaultControllerNode; 36 import org.onlab.onos.cluster.DefaultControllerNode;
36 import org.onlab.onos.cluster.NodeId; 37 import org.onlab.onos.cluster.NodeId;
37 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 38 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39 +import org.onlab.onos.store.cluster.messaging.ClusterMessage;
40 +import org.onlab.onos.store.cluster.messaging.MessageSubject;
38 import org.onlab.onos.store.service.BatchReadRequest; 41 import org.onlab.onos.store.service.BatchReadRequest;
39 import org.onlab.onos.store.service.BatchReadResult; 42 import org.onlab.onos.store.service.BatchReadResult;
40 import org.onlab.onos.store.service.BatchWriteRequest; 43 import org.onlab.onos.store.service.BatchWriteRequest;
...@@ -86,6 +89,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -86,6 +89,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
86 // initial member configuration file path 89 // initial member configuration file path
87 private String initialMemberConfig = DEFAULT_MEMBER_FILE; 90 private String initialMemberConfig = DEFAULT_MEMBER_FILE;
88 91
92 + public static final MessageSubject RAFT_LEADER_ELECTION_EVENT =
93 + new MessageSubject("raft-leader-election-event");
94 +
89 private Copycat copycat; 95 private Copycat copycat;
90 private DatabaseClient client; 96 private DatabaseClient client;
91 97
...@@ -102,8 +108,6 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -102,8 +108,6 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
102 @Activate 108 @Activate
103 public void activate() throws InterruptedException, ExecutionException { 109 public void activate() throws InterruptedException, ExecutionException {
104 110
105 - // TODO: Not every node should be part of the consensus ring.
106 -
107 // load tablet configuration 111 // load tablet configuration
108 File file = new File(CONFIG_DIR, initialMemberConfig); 112 File file = new File(CONFIG_DIR, initialMemberConfig);
109 log.info("Loading config: {}", file.getAbsolutePath()); 113 log.info("Loading config: {}", file.getAbsolutePath());
...@@ -117,16 +121,16 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -117,16 +121,16 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
117 121
118 // load default tablet configuration and start copycat 122 // load default tablet configuration and start copycat
119 clusterConfig = new TcpClusterConfig(); 123 clusterConfig = new TcpClusterConfig();
120 - Set<DefaultControllerNode> defaultMember = tabletMembers.get(DEFAULT_TABLET); 124 + Set<DefaultControllerNode> defaultMembers = tabletMembers.get(DEFAULT_TABLET);
121 - if (defaultMember == null || defaultMember.isEmpty()) { 125 + if (defaultMembers == null || defaultMembers.isEmpty()) {
122 - log.error("No member found in [{}] tablet configuration.", 126 + log.error("No members found in [{}] tablet configuration.",
123 DEFAULT_TABLET); 127 DEFAULT_TABLET);
124 throw new IllegalStateException("No member found in tablet configuration"); 128 throw new IllegalStateException("No member found in tablet configuration");
125 129
126 } 130 }
127 131
128 final ControllerNode localNode = clusterService.getLocalNode(); 132 final ControllerNode localNode = clusterService.getLocalNode();
129 - for (ControllerNode member : defaultMember) { 133 + for (ControllerNode member : defaultMembers) {
130 final TcpMember tcpMember = new TcpMember(member.ip().toString(), 134 final TcpMember tcpMember = new TcpMember(member.ip().toString(),
131 member.tcpPort()); 135 member.tcpPort());
132 if (localNode.equals(member)) { 136 if (localNode.equals(member)) {
...@@ -136,23 +140,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -136,23 +140,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
136 } 140 }
137 } 141 }
138 142
139 - // note: from this point beyond, clusterConfig requires synchronization 143 + if (clusterConfig.getLocalMember() != null) {
140 - clusterEventLatch = new CountDownLatch(1);
141 - clusterEventListener = new InternalClusterEventListener();
142 - clusterService.addListener(clusterEventListener);
143 144
144 - if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) { 145 + // Wait for a minimum viable Raft cluster to boot up.
145 - // current cluster size smaller then expected 146 + waitForClusterQuorum();
146 - try {
147 - if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
148 - log.info("Starting with {}/{} nodes cluster",
149 - clusterService.getNodes().size(),
150 - clusterConfig.getMembers().size());
151 - }
152 - } catch (InterruptedException e) {
153 - log.info("Interrupted waiting for others", e);
154 - }
155 - }
156 147
157 final TcpCluster cluster; 148 final TcpCluster cluster;
158 synchronized (clusterConfig) { 149 synchronized (clusterConfig) {
...@@ -174,27 +165,55 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -174,27 +165,55 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
174 ClusterMessagingProtocol.SERIALIZER); 165 ClusterMessagingProtocol.SERIALIZER);
175 166
176 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol); 167 copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
168 + copycat.event(LeaderElectEvent.class).registerHandler(new RaftLeaderElectionMonitor());
169 + copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
170 + }
177 171
178 client = new DatabaseClient(copycatMessagingProtocol); 172 client = new DatabaseClient(copycatMessagingProtocol);
173 + clusterCommunicator.addSubscriber(RAFT_LEADER_ELECTION_EVENT, client);
179 174
180 - 175 + // Starts copycat if this node is a participant
181 - copycat.event(LeaderElectEvent.class).registerHandler(client); 176 + // of the Raft cluster.
182 - copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker); 177 + if (copycat != null) {
183 -
184 copycat.start().get(); 178 copycat.start().get();
179 + }
185 180
186 client.waitForLeader(); 181 client.waitForLeader();
187 -
188 log.info("Started."); 182 log.info("Started.");
189 } 183 }
190 184
191 @Deactivate 185 @Deactivate
192 public void deactivate() { 186 public void deactivate() {
193 clusterService.removeListener(clusterEventListener); 187 clusterService.removeListener(clusterEventListener);
188 + // TODO: ClusterCommunicationService must support more than one
189 + // handler per message subject.
190 + clusterCommunicator.removeSubscriber(RAFT_LEADER_ELECTION_EVENT);
191 + if (copycat != null) {
194 copycat.stop(); 192 copycat.stop();
193 + }
195 log.info("Stopped."); 194 log.info("Stopped.");
196 } 195 }
197 196
197 + private void waitForClusterQuorum() {
198 + // note: from this point beyond, clusterConfig requires synchronization
199 + clusterEventLatch = new CountDownLatch(1);
200 + clusterEventListener = new InternalClusterEventListener();
201 + clusterService.addListener(clusterEventListener);
202 +
203 + if (clusterService.getNodes().size() < clusterConfig.getMembers().size()) {
204 + // current cluster size smaller then expected
205 + try {
206 + if (!clusterEventLatch.await(120, TimeUnit.SECONDS)) {
207 + log.info("Starting with {}/{} nodes cluster",
208 + clusterService.getNodes().size(),
209 + clusterConfig.getMembers().size());
210 + }
211 + } catch (InterruptedException e) {
212 + log.info("Interrupted waiting for others", e);
213 + }
214 + }
215 + }
216 +
198 @Override 217 @Override
199 public boolean createTable(String name) { 218 public boolean createTable(String name) {
200 return client.createTable(name); 219 return client.createTable(name);
...@@ -353,6 +372,24 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService { ...@@ -353,6 +372,24 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
353 } 372 }
354 } 373 }
355 374
375 + private final class RaftLeaderElectionMonitor implements EventHandler<LeaderElectEvent> {
376 + @Override
377 + public void handle(LeaderElectEvent event) {
378 + try {
379 + if (clusterConfig.getLocalMember() != null && event.leader().equals(clusterConfig.getLocalMember())) {
380 + // This node just became the leader.
381 + clusterCommunicator.broadcastIncludeSelf(
382 + new ClusterMessage(
383 + clusterService.getLocalNode().id(),
384 + RAFT_LEADER_ELECTION_EVENT,
385 + ClusterMessagingProtocol.SERIALIZER.encode(event)));
386 + }
387 + } catch (IOException e) {
388 + log.error("Failed to broadcast raft leadership change event", e);
389 + }
390 + }
391 + }
392 +
356 private final class InternalClusterEventListener 393 private final class InternalClusterEventListener
357 implements ClusterEventListener { 394 implements ClusterEventListener {
358 395
......