Madan Jampani
Committed by Yuta HIGUCHI

1. Fixed a synchronization issue with database update processing and expiry tracking.

2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements

Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
......@@ -19,4 +19,18 @@ public class DatabaseException extends RuntimeException {
public DatabaseException() {
};
public static class Timeout extends DatabaseException {
public Timeout(String message, Throwable t) {
super(message, t);
}
public Timeout(String message) {
super(message);
}
public Timeout(Throwable t) {
super(t);
}
}
}
\ No newline at end of file
......
package org.onlab.onos.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchWriteRequest;
......@@ -28,7 +31,7 @@ import org.slf4j.Logger;
/**
* Client for interacting with the Copycat Raft cluster.
*/
public class DatabaseClient {
public class DatabaseClient implements EventHandler<LeaderElectEvent> {
private static final int RETRIES = 5;
......@@ -36,137 +39,101 @@ public class DatabaseClient {
private final Logger log = getLogger(getClass());
private final Copycat copycat;
private final DatabaseProtocolService protocol;
private volatile ProtocolClient copycat = null;
private volatile Member currentLeader = null;
public DatabaseClient(Copycat copycat) {
this.copycat = checkNotNull(copycat);
public DatabaseClient(DatabaseProtocolService protocol) {
this.protocol = protocol;
}
@Override
public void handle(LeaderElectEvent event) {
Member newLeader = event.leader();
if (newLeader != null && !newLeader.equals(currentLeader)) {
currentLeader = newLeader;
if (copycat != null) {
copycat.close();
}
copycat = protocol.createClient((TcpMember) currentLeader);
copycat.connect();
}
}
private String nextRequestId() {
return UUID.randomUUID().toString();
}
public void waitForLeader() {
if (copycat.leader() != null) {
if (currentLeader != null) {
return;
}
log.info("No leader in cluster, waiting for election.");
final CountDownLatch latch = new CountDownLatch(1);
final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
@Override
public void handle(LeaderElectEvent event) {
log.info("Leader chosen: {}", event);
latch.countDown();
}
};
copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
try {
while (copycat.leader() == null) {
latch.await(200, TimeUnit.MILLISECONDS);
while (currentLeader == null) {
Thread.sleep(200);
}
log.info("Leader appeared: {}", copycat.leader());
log.info("Leader appeared: {}", currentLeader);
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);
Thread.currentThread().interrupt();
} finally {
copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
}
}
public boolean createTable(String tableName) {
private <T> T submit(String operationName, Object... args) {
waitForLeader();
CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
if (currentLeader == null) {
throw new DatabaseException("Raft cluster does not have a leader.");
}
SubmitRequest request =
new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
log.debug("Sent {} to {}", request, currentLeader);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
} catch (ExecutionException | InterruptedException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException.Timeout(e);
}
}
public boolean createTable(String tableName) {
return submit("createTable", tableName);
}
public boolean createTable(String tableName, int ttlMillis) {
waitForLeader();
CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
return submit("createTable", tableName, ttlMillis);
}
public void dropTable(String tableName) {
waitForLeader();
CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
submit("dropTable", tableName);
}
public void dropAllTables() {
waitForLeader();
CompletableFuture<Void> future = copycat.submit("dropAllTables");
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
submit("dropAllTables");
}
public Set<String> listTables() {
waitForLeader();
try {
for (int i = 0; i < RETRIES; ++i) {
CompletableFuture<Set<String>> future = copycat.submit("listTables");
try {
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
log.debug("Timed out retrying {}", i);
future.cancel(true);
waitForLeader();
}
}
// TODO: proper timeout handling
log.error("Timed out");
return Collections.emptySet();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
return submit("listTables");
}
public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
waitForLeader();
CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
try {
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException(e);
}
return submit("read", batchRequest);
}
public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
waitForLeader();
CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
try {
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException(e);
}
return submit("write", batchRequest);
}
public Map<String, VersionedValue> getAll(String tableName) {
waitForLeader();
CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
try {
return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException(e);
}
return submit("getAll", tableName);
}
}
......
......@@ -16,10 +16,14 @@
package org.onlab.onos.store.service.impl;
import static org.onlab.util.Tools.namedThreads;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -40,6 +44,8 @@ import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.MoreObjects;
/**
* Plugs into the database update stream and track the TTL of entries added to
* the database. For tables with pre-configured finite TTL, this class has
......@@ -48,6 +54,9 @@ import org.slf4j.LoggerFactory;
public class DatabaseEntryExpirationTracker implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
private static final ExecutorService THREAD_POOL =
Executors.newCachedThreadPool(namedThreads("database-stale-entry-expirer-%d"));
private final Logger log = LoggerFactory.getLogger(getClass());
private final DatabaseService databaseService;
......@@ -74,7 +83,7 @@ public class DatabaseEntryExpirationTracker implements
@Override
public void tableModified(TableModificationEvent event) {
log.debug("Received a table modification event {}", event);
log.debug("{}: Received {}", localNode.id(), event);
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
......@@ -89,8 +98,8 @@ public class DatabaseEntryExpirationTracker implements
map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
try {
// FIXME: The broadcast message should be sent to self.
clusterCommunicator.broadcast(new ClusterMessage(
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
......@@ -119,8 +128,6 @@ public class DatabaseEntryExpirationTracker implements
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
// TODO: make the expiration policy configurable.
// Do we need to support expiration based on last access time?
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
......@@ -135,6 +142,23 @@ public class DatabaseEntryExpirationTracker implements
ExpirationListener<DatabaseRow, Long> {
@Override
public void expired(DatabaseRow row, Long version) {
THREAD_POOL.submit(new ExpirationTask(row, version));
}
}
private class ExpirationTask implements Runnable {
private final DatabaseRow row;
private final Long version;
public ExpirationTask(DatabaseRow row, Long version) {
this.row = row;
this.version = version;
}
@Override
public void run() {
log.debug("Received an expiration event for {}, version: {}", row, version);
Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
try {
if (isLocalMemberLeader.get()) {
......@@ -142,7 +166,7 @@ public class DatabaseEntryExpirationTracker implements
row.key, version)) {
log.info("Entry in database was updated right before its expiration.");
} else {
log.info("Successfully expired old entry with key ({}) from table ({})",
log.debug("Successfully expired old entry with key ({}) from table ({})",
row.key, row.tableName);
}
} else {
......@@ -164,6 +188,9 @@ public class DatabaseEntryExpirationTracker implements
@Override
public void handle(LeaderElectEvent event) {
isLocalMemberLeader.set(localMember.equals(event.leader()));
if (isLocalMemberLeader.get()) {
log.info("{} is now the leader of Raft cluster", localNode.id());
}
}
/**
......@@ -180,6 +207,14 @@ public class DatabaseEntryExpirationTracker implements
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("tableName", tableName)
.add("key", key)
.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
......@@ -204,6 +239,7 @@ public class DatabaseEntryExpirationTracker implements
if (!tableEntryExpirationMap.isEmpty()) {
return;
}
log.debug("Received a snapshot installed notification");
for (String tableName : state.getTableNames()) {
TableMetadata metadata = state.getTableMetadata(tableName);
......
......@@ -173,13 +173,16 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
client = new DatabaseClient(copycatMessagingProtocol);
copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
copycat.event(LeaderElectEvent.class).registerHandler(client);
copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
copycat.start().get();
client = new DatabaseClient(copycat);
client = new DatabaseClient(copycatMessagingProtocol);
client.waitForLeader();
log.info("Started.");
......
package org.onlab.onos.store.service.impl;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.ByteArrayInputStream;
......@@ -9,6 +10,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
......@@ -49,6 +52,9 @@ public class DatabaseStateMachine implements StateMachine {
private final Logger log = getLogger(getClass());
private final ExecutorService updatesExecutor =
Executors.newSingleThreadExecutor(namedThreads("database-statemachine-updates"));
// message subject for database update notifications.
public static final MessageSubject DATABASE_UPDATE_EVENTS =
new MessageSubject("database-update-events");
......@@ -88,8 +94,7 @@ public class DatabaseStateMachine implements StateMachine {
}
@Command
public boolean createTableWithExpiration(String tableName) {
int ttlMillis = 10000;
public boolean createTable(String tableName, Integer ttlMillis) {
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
......@@ -100,18 +105,32 @@ public class DatabaseStateMachine implements StateMachine {
return false;
}
state.createTable(metadata);
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(metadata);
}
updatesExecutor.submit(new Runnable() {
@Override
public void run() {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(metadata);
}
}
});
return true;
}
@Command
public boolean dropTable(String tableName) {
if (state.removeTable(tableName)) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableDeleted(tableName);
}
updatesExecutor.submit(new Runnable() {
@Override
public void run() {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableDeleted(tableName);
}
}
});
return true;
}
return false;
......@@ -121,11 +140,18 @@ public class DatabaseStateMachine implements StateMachine {
public boolean dropAllTables() {
Set<String> tableNames = state.getTableNames();
state.removeAllTables();
for (DatabaseUpdateEventListener listener : listeners) {
for (String tableName : tableNames) {
listener.tableDeleted(tableName);
updatesExecutor.submit(new Runnable() {
@Override
public void run() {
for (DatabaseUpdateEventListener listener : listeners) {
for (String tableName : tableNames) {
listener.tableDeleted(tableName);
}
}
}
}
});
return true;
}
......@@ -273,12 +299,18 @@ public class DatabaseStateMachine implements StateMachine {
}
// notify listeners of table mod events.
for (DatabaseUpdateEventListener listener : listeners) {
for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
log.trace("Publishing table modification event: {}", tableModificationEvent);
listener.tableModified(tableModificationEvent);
updatesExecutor.submit(new Runnable() {
@Override
public void run() {
for (DatabaseUpdateEventListener listener : listeners) {
for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
log.trace("Publishing table modification event: {}", tableModificationEvent);
listener.tableModified(tableModificationEvent);
}
}
}
}
});
return results;
}
......@@ -397,10 +429,15 @@ public class DatabaseStateMachine implements StateMachine {
this.state = SERIALIZER.decode(data);
}
// FIXME: synchronize.
for (DatabaseUpdateEventListener listener : listeners) {
listener.snapshotInstalled(state);
}
updatesExecutor.submit(new Runnable() {
@Override
public void run() {
for (DatabaseUpdateEventListener listener : listeners) {
listener.snapshotInstalled(state);
}
}
});
} catch (Exception e) {
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
......
package org.onlab.onos.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -11,12 +13,17 @@ import org.joda.time.DateTime;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
import org.slf4j.Logger;
/**
* A distributed lock implementation.
*/
public class DistributedLock implements Lock {
private final Logger log = getLogger(getClass());
private static final long MAX_WAIT_TIME_MS = 100000000L;
private final DistributedLockManager lockManager;
private final DatabaseService databaseService;
private final String path;
......@@ -44,54 +51,60 @@ public class DistributedLock implements Lock {
@Override
public void lock(int leaseDurationMillis) {
if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
// Nothing to do.
// Current expiration time is beyond what is requested.
return;
} else {
tryLock(Long.MAX_VALUE, leaseDurationMillis);
tryLock(MAX_WAIT_TIME_MS, leaseDurationMillis);
}
}
@Override
public boolean tryLock(int leaseDurationMillis) {
return databaseService.putIfAbsent(
if (databaseService.putIfAbsent(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId);
lockId)) {
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
return false;
}
@Override
public boolean tryLock(
long waitTimeMillis,
int leaseDurationMillis) {
if (!tryLock(leaseDurationMillis)) {
CompletableFuture<Void> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException e) {
// TODO: ExecutionException could indicate something
// wrong with the backing database.
// Throw an exception?
return false;
} catch (TimeoutException e) {
return false;
}
if (tryLock(leaseDurationMillis)) {
return true;
}
CompletableFuture<DateTime> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
return true;
} catch (ExecutionException | InterruptedException e) {
log.error("Encountered an exception trying to acquire lock for " + path, e);
// TODO: ExecutionException could indicate something
// wrong with the backing database.
// Throw an exception?
return false;
} catch (TimeoutException e) {
log.debug("Timed out waiting to acquire lock for {}", path);
return false;
}
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
@Override
public boolean isLocked() {
if (isLocked.get()) {
// We rely on local information to check
// if the expired.
// if the lock expired.
// This should should make this call
// light weight, which still retaining the same
// light weight, while still retaining the
// safety guarantees.
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
......@@ -108,17 +121,30 @@ public class DistributedLock implements Lock {
if (!isLocked()) {
return;
} else {
isLocked.set(false);
databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
isLocked.set(false);
}
}
}
@Override
public boolean extendExpiration(int leaseDurationMillis) {
if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
if (!isLocked()) {
log.warn("Ignoring request to extend expiration for lock {}."
+ " ExtendExpiration must be called for locks that are already acquired.", path);
}
if (databaseService.putIfValueMatches(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId,
lockId)) {
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
return true;
} else {
return tryLock(leaseDurationMillis);
log.info("Failed to extend expiration for {}", path);
return false;
}
}
}
\ No newline at end of file
}
......
package org.onlab.onos.store.service.impl;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -23,18 +26,23 @@ import org.onlab.onos.store.service.LockEventListener;
import org.onlab.onos.store.service.LockService;
import org.slf4j.Logger;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
@Component(immediate = true)
@Service
public class DistributedLockManager implements LockService {
private static final ExecutorService THREAD_POOL =
Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
private final Logger log = getLogger(getClass());
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
.create();
private final ListMultimap<String, LockRequest> locksToAcquire =
Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
......@@ -56,8 +64,9 @@ public class DistributedLockManager implements LockService {
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
locksToAcquire.clear();
log.info("Started.");
log.info("Stopped.");
}
@Override
......@@ -77,11 +86,19 @@ public class DistributedLockManager implements LockService {
throw new UnsupportedOperationException();
}
protected CompletableFuture<Void> lockIfAvailable(Lock lock,
long waitTimeMillis, int leaseDurationMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
leaseDurationMillis, future));
/**
* Attempts to acquire the lock as soon as it becomes available.
* @param lock lock to acquire.
* @param waitTimeMillis maximum time to wait before giving up.
* @param leaseDurationMillis the duration for which to acquire the lock initially.
* @return Future lease expiration date.
*/
protected CompletableFuture<DateTime> lockIfAvailable(
Lock lock,
long waitTimeMillis,
int leaseDurationMillis) {
CompletableFuture<DateTime> future = new CompletableFuture<>();
locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
return future;
}
......@@ -90,37 +107,46 @@ public class DistributedLockManager implements LockService {
public void handle(ClusterMessage message) {
TableModificationEvent event = DatabaseStateMachine.SERIALIZER
.decode(message.payload());
if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
return;
if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
THREAD_POOL.submit(new RetryLockTask(event.key()));
}
}
}
private class RetryLockTask implements Runnable {
private final String path;
log.info("Received a lock available event for path: {}", event.key());
public RetryLockTask(String path) {
this.path = path;
}
String path = event.key();
@Override
public void run() {
if (!locksToAcquire.containsKey(path)) {
return;
}
if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
List<LockRequest> existingRequests = locksToAcquire.get(path);
if (existingRequests == null) {
return;
}
synchronized (existingRequests) {
Iterator<LockRequest> existingRequestIterator = existingRequests
.iterator();
while (existingRequestIterator.hasNext()) {
LockRequest request = existingRequestIterator.next();
if (request.expirationTime().isAfter(DateTime.now())) {
List<LockRequest> existingRequests = locksToAcquire.get(path);
if (existingRequests == null || existingRequests.isEmpty()) {
return;
}
log.info("Path {} is now available for locking. There are {} outstanding "
+ "requests for it.",
path, existingRequests.size());
synchronized (existingRequests) {
Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
while (existingRequestIterator.hasNext()) {
LockRequest request = existingRequestIterator.next();
if (DateTime.now().isAfter(request.requestExpirationTime())) {
// request expired.
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(request.leaseDurationMillis())) {
request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(
request.leaseDurationMillis())) {
request.future().complete(null);
existingRequestIterator.remove();
}
}
}
}
......@@ -131,16 +157,15 @@ public class DistributedLockManager implements LockService {
private class LockRequest {
private final Lock lock;
private final DateTime expirationTime;
private final DateTime requestExpirationTime;
private final int leaseDurationMillis;
private final CompletableFuture<Void> future;
private final CompletableFuture<DateTime> future;
public LockRequest(Lock lock, long waitTimeMillis,
int leaseDurationMillis, CompletableFuture<Void> future) {
int leaseDurationMillis, CompletableFuture<DateTime> future) {
this.lock = lock;
this.expirationTime = DateTime.now().plusMillis(
(int) waitTimeMillis);
this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
......@@ -149,15 +174,15 @@ public class DistributedLockManager implements LockService {
return lock;
}
public DateTime expirationTime() {
return expirationTime;
public DateTime requestExpirationTime() {
return requestExpirationTime;
}
public int leaseDurationMillis() {
return leaseDurationMillis;
}
public CompletableFuture<Void> future() {
public CompletableFuture<DateTime> future() {
return future;
}
}
......
......@@ -86,7 +86,7 @@ public class MapDBLog implements Log {
}
@Override
public List<Long> appendEntries(List<Entry> entries) {
public synchronized List<Long> appendEntries(List<Entry> entries) {
assertIsOpen();
checkArgument(entries != null, "expecting non-null entries");
final List<Long> indices = new ArrayList<>(entries.size());
......