Yuta HIGUCHI

DatabaseClient: Add timeout

- timeout + retry to listTable
- timeout to service API

Change-Id: I8b54dd24d380dcc9e8d44baf3bbf5e379ccca53b
......@@ -3,6 +3,7 @@ package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -10,6 +11,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.event.EventHandler;
......@@ -28,6 +30,10 @@ import org.slf4j.Logger;
*/
public class DatabaseClient {
private static final int RETRIES = 5;
private static final int TIMEOUT_MS = 2000;
private final Logger log = getLogger(getClass());
private final Copycat copycat;
......@@ -109,9 +115,20 @@ public class DatabaseClient {
public Set<String> listTables() {
waitForLeader();
CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get();
for (int i = 0; i < RETRIES; ++i) {
CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.debug("Timed out retrying {}", i);
future.cancel(true);
waitForLeader();
}
}
// TODO: proper timeout handling
log.error("Timed out");
return Collections.emptySet();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
......@@ -121,9 +138,11 @@ public class DatabaseClient {
waitForLeader();
CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
return future.get();
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException(e);
}
}
......@@ -131,9 +150,11 @@ public class DatabaseClient {
waitForLeader();
CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
return future.get();
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException(e);
}
}
......@@ -141,9 +162,11 @@ public class DatabaseClient {
waitForLeader();
CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
try {
return future.get();
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException(e);
}
}
}
......