Madan Jampani
Committed by Pavlin Radoslavov

1. DatabaseManager activate will attempt to listTables to ensure store is in good shape.

2. lock and tryLock can now throw InterruptedExceptions.

Change-Id: Ifa766ad441f677a4071b68d8f6caa564cf320869

Change-Id: I318ff762a96b261737831f6bd7c200b384c638e9

Change-Id: I0f509703520b3187931fa3669cd8213a91e85c96
......@@ -160,7 +160,7 @@ public class FooComponent {
}
}
private void lockUnlock() {
private void lockUnlock() throws InterruptedException {
try {
final String locksTable = "onos-locks";
......
......@@ -105,11 +105,11 @@ public class SdnIp implements SdnIpService {
new ThreadFactoryBuilder()
.setNameFormat("sdnip-leader-election-%d").build());
leaderElectionExecutor.execute(new Runnable() {
@Override
public void run() {
doLeaderElectionThread();
}
});
@Override
public void run() {
doLeaderElectionThread();
}
});
// Manually set the instance as the leader to allow testing
// TODO change this when we get a leader election
......@@ -174,22 +174,22 @@ public class SdnIp implements SdnIpService {
log.debug("SDN-IP Leader Election begin");
// Block until it becomes the leader
leaderLock.lock(LEASE_DURATION_MS);
// This instance is the leader
log.info("SDN-IP Leader Elected");
intentSynchronizer.leaderChanged(true);
// Keep extending the expiration until shutdown
int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
//
// Keep periodically extending the lock expiration.
// If there are multiple back-to-back failures to extend (with
// extra sleep time between retrials), then release the lock.
//
while (!isShutdown) {
try {
try {
leaderLock.lock(LEASE_DURATION_MS);
// This instance is the leader
log.info("SDN-IP Leader Elected");
intentSynchronizer.leaderChanged(true);
// Keep extending the expiration until shutdown
int extensionFailedCountdown = LEASE_EXTEND_RETRY_MAX - 1;
//
// Keep periodically extending the lock expiration.
// If there are multiple back-to-back failures to extend (with
// extra sleep time between retrials), then release the lock.
//
while (!isShutdown) {
Thread.sleep(LEASE_DURATION_MS / LEASE_EXTEND_RETRY_MAX);
if (leaderLock.extendExpiration(LEASE_DURATION_MS)) {
log.trace("SDN-IP Leader Extended");
......@@ -211,13 +211,12 @@ public class SdnIp implements SdnIpService {
break; // Try again to get the lock
}
}
} catch (InterruptedException e) {
// Thread interrupted. Time to shutdown
log.debug("SDN-IP Leader Interrupted");
}
} catch (InterruptedException e) {
// Thread interrupted. Time to shutdown
log.debug("SDN-IP Leader Interrupted");
}
}
// If we reach here, the instance was shutdown
intentSynchronizer.leaderChanged(false);
leaderLock.unlock();
......
......@@ -31,8 +31,9 @@ public interface Lock {
* lock after granting it, before automatically releasing it if it hasn't
* already been released by invoking unlock(). Must be in the range
* (0, LockManager.MAX_LEASE_MILLIS]
* @throws InterruptedException if the thread is interrupted while waiting
*/
void lock(int leaseDurationMillis);
void lock(int leaseDurationMillis) throws InterruptedException;
/**
* Acquires the lock only if it is free at the time of invocation.
......@@ -54,8 +55,9 @@ public interface Lock {
* (0, LockManager.MAX_LEASE_MILLIS]
* @return true if the lock was acquired and false if the waiting time
* elapsed before the lock was acquired
* @throws InterruptedException if the thread is interrupted while waiting
*/
boolean tryLock(long waitTimeMillis, int leaseDurationMillis);
boolean tryLock(long waitTimeMillis, int leaseDurationMillis) throws InterruptedException;
/**
* Returns true if this Lock instance currently holds the lock.
......
......@@ -179,6 +179,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
client.waitForLeader();
// Try and list the tables to verify database manager is
// in a state where it can serve requests.
tryTableListing();
log.info("Started.");
}
......@@ -214,6 +219,24 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
}
private void tryTableListing() {
int retries = 0;
do {
try {
listTables();
return;
} catch (DatabaseException e) {
if (retries == 10) {
log.error("Failed to listTables after multiple attempts. Giving up.", e);
throw e;
} else {
log.debug("Failed to listTables. Will retry...", e);
retries++;
}
}
} while (true);
}
@Override
public boolean createTable(String name) {
return client.createTable(name);
......
......@@ -12,6 +12,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.joda.time.DateTime;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
import org.slf4j.Logger;
......@@ -23,8 +24,6 @@ public class DistributedLock implements Lock {
private final Logger log = getLogger(getClass());
private static final long MAX_WAIT_TIME_MS = 100000000L;
private final DistributedLockManager lockManager;
private final DatabaseService databaseService;
private final String path;
......@@ -53,13 +52,17 @@ public class DistributedLock implements Lock {
}
@Override
public void lock(int leaseDurationMillis) {
public void lock(int leaseDurationMillis) throws InterruptedException {
if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
// Nothing to do.
// Current expiration time is beyond what is requested.
return;
} else {
tryLock(MAX_WAIT_TIME_MS, leaseDurationMillis);
CompletableFuture<DateTime> future =
lockManager.lockIfAvailable(this, leaseDurationMillis);
try {
lockExpirationTime = future.get();
} catch (ExecutionException e) {
throw new DatabaseException(e);
}
}
}
......@@ -79,22 +82,17 @@ public class DistributedLock implements Lock {
@Override
public boolean tryLock(
long waitTimeMillis,
int leaseDurationMillis) {
int leaseDurationMillis) throws InterruptedException {
if (tryLock(leaseDurationMillis)) {
return true;
}
CompletableFuture<DateTime> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
return true;
} catch (ExecutionException | InterruptedException e) {
log.error("Encountered an exception trying to acquire lock for " + path, e);
// TODO: ExecutionException could indicate something
// wrong with the backing database.
// Throw an exception?
return false;
} catch (ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
log.debug("Timed out waiting to acquire lock for {}", path);
return false;
......
......@@ -64,23 +64,22 @@ public class DistributedLockManager implements LockService {
@Activate
public void activate() {
try {
Set<String> tableNames = databaseAdminService.listTables();
if (!tableNames.contains(ONOS_LOCK_TABLE_NAME)) {
Set<String> tables = databaseAdminService.listTables();
if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
}
}
} catch (DatabaseException e) {
log.error("DistributedLockManager#activate failed.", e);
throw e;
}
clusterCommunicator.addSubscriber(
DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
new LockEventMessageListener());
log.info("Started.");
clusterCommunicator.addSubscriber(
DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
new LockEventMessageListener());
log.info("Started");
}
@Deactivate
......@@ -119,7 +118,31 @@ public class DistributedLockManager implements LockService {
long waitTimeMillis,
int leaseDurationMillis) {
CompletableFuture<DateTime> future = new CompletableFuture<>();
locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
LockRequest request = new LockRequest(
lock,
leaseDurationMillis,
DateTime.now().plusMillis(leaseDurationMillis),
future);
locksToAcquire.put(lock.path(), request);
return future;
}
/**
* Attempts to acquire the lock as soon as it becomes available.
* @param lock lock to acquire.
* @param leaseDurationMillis the duration for which to acquire the lock initially.
* @return Future lease expiration date.
*/
protected CompletableFuture<DateTime> lockIfAvailable(
Lock lock,
int leaseDurationMillis) {
CompletableFuture<DateTime> future = new CompletableFuture<>();
LockRequest request = new LockRequest(
lock,
leaseDurationMillis,
DateTime.now().plusYears(100),
future);
locksToAcquire.put(lock.path(), request);
return future;
}
......@@ -182,11 +205,14 @@ public class DistributedLockManager implements LockService {
private final int leaseDurationMillis;
private final CompletableFuture<DateTime> future;
public LockRequest(Lock lock, long waitTimeMillis,
int leaseDurationMillis, CompletableFuture<DateTime> future) {
public LockRequest(
Lock lock,
int leaseDurationMillis,
DateTime requestExpirationTime,
CompletableFuture<DateTime> future) {
this.lock = lock;
this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
this.requestExpirationTime = requestExpirationTime;
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
......