Yuta HIGUCHI

Wait for Leader to appear

- DatabaseClient: wait for Leader before DB access
- DatabaseManager: wait for Leader before activate

Change-Id: I5102e7cae1d33f49662bf452b1fba020173a51a0
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
......@@ -16,20 +21,54 @@ import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.ReadResult;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.WriteResult;
import org.slf4j.Logger;
/**
* Client for interacting with the Copycat Raft cluster.
*/
public class DatabaseClient {
private final Logger log = getLogger(getClass());
private final Copycat copycat;
public DatabaseClient(Copycat copycat) {
this.copycat = checkNotNull(copycat);
}
public boolean createTable(String tableName) {
public void waitForLeader() {
if (copycat.leader() != null) {
return;
}
log.info("No leader in cluster, waiting for election.");
final CountDownLatch latch = new CountDownLatch(1);
final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
@Override
public void handle(LeaderElectEvent event) {
log.info("Leader chosen: {}", event);
latch.countDown();
}
};
copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
try {
while (copycat.leader() == null) {
latch.await(200, TimeUnit.MILLISECONDS);
}
log.info("Leader appeared: {}", copycat.leader());
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);
Thread.currentThread().interrupt();
} finally {
copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
}
}
public boolean createTable(String tableName) {
waitForLeader();
CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
try {
return future.get();
......@@ -39,7 +78,7 @@ public class DatabaseClient {
}
public boolean createTable(String tableName, int ttlMillis) {
waitForLeader();
CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
try {
return future.get();
......@@ -49,7 +88,7 @@ public class DatabaseClient {
}
public void dropTable(String tableName) {
waitForLeader();
CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
try {
future.get();
......@@ -59,7 +98,7 @@ public class DatabaseClient {
}
public void dropAllTables() {
waitForLeader();
CompletableFuture<Void> future = copycat.submit("dropAllTables");
try {
future.get();
......@@ -69,7 +108,7 @@ public class DatabaseClient {
}
public Set<String> listTables() {
waitForLeader();
CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get();
......@@ -79,7 +118,7 @@ public class DatabaseClient {
}
public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
waitForLeader();
CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
return future.get();
......@@ -89,7 +128,7 @@ public class DatabaseClient {
}
public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
waitForLeader();
CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
return future.get();
......@@ -99,6 +138,7 @@ public class DatabaseClient {
}
public Map<String, VersionedValue> getAll(String tableName) {
waitForLeader();
CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
try {
return future.get();
......
......@@ -10,6 +10,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
......@@ -99,7 +100,7 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
private boolean autoAddMember = false;
@Activate
public void activate() {
public void activate() throws InterruptedException, ExecutionException {
// TODO: Not every node should be part of the consensus ring.
......@@ -176,9 +177,10 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
copycat.start();
copycat.start().get();
client = new DatabaseClient(copycat);
client.waitForLeader();
log.info("Started.");
}
......