Madan Jampani

Using net.jodah.expiringmap.ExpiringMap for tracking ttl expiration of database entries.

Minor javadoc updates.
......@@ -22,6 +22,7 @@ public class ReadResult {
/**
* Returns the status of the read operation.
* @return read operation status
*/
public ReadStatus status() {
return status;
......
......@@ -20,11 +20,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
//import net.jodah.expiringmap.ExpiringMap;
//import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
//import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap;
import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
......@@ -34,19 +35,22 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Database update event handler.
* Plugs into the database update stream and track the TTL of entries added to
* the database. For tables with pre-configured finite TTL, this class has
* mechanisms for expiring (deleting) old, expired entries from the database.
*/
public class DatabaseUpdateEventHandler implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
public class DatabaseEntryExpirationTracker implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
private final Logger log = LoggerFactory.getLogger(getClass());
public static final MessageSubject DATABASE_UPDATES =
new MessageSubject("database-update-event");
public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
"database-update-event");
private DatabaseService databaseService;
private ClusterService cluster;
......@@ -54,29 +58,32 @@ public class DatabaseUpdateEventHandler implements
private final Member localMember;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
//private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
DatabaseUpdateEventHandler(Member localMember) {
private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(Member localMember) {
this.localMember = localMember;
}
@Override
public void tableModified(TableModificationEvent event) {
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(event.tableName());
switch (event.type()) {
case ROW_DELETED:
if (isLocalMemberLeader.get()) {
try {
clusterCommunicator.broadcast(
new ClusterMessage(
cluster.getLocalNode().id(),
DATABASE_UPDATES,
DatabaseStateMachine.SERIALIZER.encode(event)));
clusterCommunicator.broadcast(new ClusterMessage(cluster
.getLocalNode().id(), DATABASE_UPDATES,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
log.error("Failed to broadcast a database table modification event.", e);
log.error(
"Failed to broadcast a database table modification event.",
e);
}
}
break;
......@@ -94,15 +101,11 @@ public class DatabaseUpdateEventHandler implements
// make this explicit instead of relying on a negative value
// to indicate no expiration.
if (expirationTimeMillis > 0) {
tableEntryExpirationMap.put(tableName, null);
/*
ExpiringMap.builder()
tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
.expiration(expirationTimeMillis, TimeUnit.SECONDS)
.expirationListener(expirationObserver)
// FIXME: make the expiration policy configurable.
.expirationPolicy(ExpirationPolicy.CREATED)
.build());
*/
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
......@@ -111,27 +114,40 @@ public class DatabaseUpdateEventHandler implements
tableEntryExpirationMap.remove(tableName);
}
/*
private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
private class ExpirationObserver implements
ExpirationListener<DatabaseRow, VersionedValue> {
@Override
public void expired(DatabaseRow key, Void value) {
public void expired(DatabaseRow key, VersionedValue value) {
try {
// TODO: The safety of this check needs to be verified.
// Couple of issues:
// 1. It is very likely that only one member should attempt deletion of the entry from database.
// 2. A potential race condition exists where the entry expires, but before its can be deleted
// from the database, a new entry is added or existing entry is updated.
// That means ttl and expiration should be for a given version.
if (isLocalMemberLeader.get()) {
databaseService.remove(key.tableName, key.key);
if (!databaseService.removeIfVersionMatches(key.tableName,
key.key, value.version())) {
log.info("Entry in the database changed before right its TTL expiration.");
}
} else {
// If this node is not the current leader, we should never
// let the expiring entries drop off
// Under stable conditions (i.e no leadership switch) the
// current leader will initiate
// a database remove and this instance will get notified
// of a tableModification event causing it to remove from
// the map.
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(key.tableName);
if (map != null) {
map.put(key, value);
}
}
} catch (Exception e) {
log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
log.warn(
"Failed to delete entry from the database after ttl expiration. Will retry eviction",
e);
tableEntryExpirationMap.get(key.tableName).put(
new DatabaseRow(key.tableName, key.key), value);
}
}
}
*/
@Override
public void handle(LeaderElectEvent event) {
......@@ -140,6 +156,9 @@ public class DatabaseUpdateEventHandler implements
}
}
/**
* Wrapper class for a database row identifier.
*/
private class DatabaseRow {
String tableName;
......@@ -160,8 +179,8 @@ public class DatabaseUpdateEventHandler implements
}
DatabaseRow that = (DatabaseRow) obj;
return Objects.equals(this.tableName, that.tableName) &&
Objects.equals(this.key, that.key);
return Objects.equals(this.tableName, that.tableName)
&& Objects.equals(this.key, that.key);
}
@Override
......@@ -169,4 +188,4 @@ public class DatabaseUpdateEventHandler implements
return Objects.hash(tableName, key);
}
}
}
\ No newline at end of file
}
......
......@@ -237,8 +237,8 @@ public class DatabaseStateMachine implements StateMachine {
WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
results.add(putResult);
tableModificationEvent = (previousValue == null) ?
TableModificationEvent.rowAdded(request.tableName(), request.key()) :
TableModificationEvent.rowUpdated(request.tableName(), request.key());
TableModificationEvent.rowAdded(request.tableName(), request.key(), newValue) :
TableModificationEvent.rowUpdated(request.tableName(), request.key(), newValue);
break;
case REMOVE:
......@@ -249,7 +249,7 @@ public class DatabaseStateMachine implements StateMachine {
results.add(removeResult);
if (removedValue != null) {
tableModificationEvent =
TableModificationEvent.rowDeleted(request.tableName(), request.key());
TableModificationEvent.rowDeleted(request.tableName(), request.key(), removedValue);
}
break;
......
......@@ -29,15 +29,14 @@ public interface DatabaseUpdateEventListener {
/**
* Notifies listeners of a table created event.
* @param tableName
* @param expirationTimeMillis
* @param tableName name of the table created
* @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
*/
public void tableCreated(String tableName, int expirationTimeMillis);
/**
* Notifies listeners of a table deleted event.
* @param tableName
* @param tableName name of the table deleted
*/
public void tableDeleted(String tableName);
}
\ No newline at end of file
......
......@@ -33,7 +33,8 @@ public class DistributedLockManager implements LockService {
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
.create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
......@@ -61,11 +62,7 @@ public class DistributedLockManager implements LockService {
@Override
public Lock create(String path) {
return new DistributedLock(
path,
databaseService,
clusterService,
this);
return new DistributedLock(path, databaseService, clusterService, this);
}
@Override
......@@ -80,21 +77,19 @@ public class DistributedLockManager implements LockService {
throw new UnsupportedOperationException();
}
protected CompletableFuture<Void> lockIfAvailable(
Lock lock,
long waitTimeMillis,
int leaseDurationMillis) {
protected CompletableFuture<Void> lockIfAvailable(Lock lock,
long waitTimeMillis, int leaseDurationMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
locksToAcquire.put(
lock.path(),
new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
leaseDurationMillis, future));
return future;
}
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
TableModificationEvent event = DatabaseStateMachine.SERIALIZER
.decode(message.payload());
if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
return;
}
......@@ -110,15 +105,20 @@ public class DistributedLockManager implements LockService {
return;
}
Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
while (existingRequestIterator.hasNext()) {
LockRequest request = existingRequestIterator.next();
if (request.expirationTime().isAfter(DateTime.now())) {
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(request.leaseDurationMillis())) {
request.future().complete(null);
existingRequests.remove(0);
synchronized (existingRequests) {
Iterator<LockRequest> existingRequestIterator = existingRequests
.iterator();
while (existingRequestIterator.hasNext()) {
LockRequest request = existingRequestIterator.next();
if (request.expirationTime().isAfter(DateTime.now())) {
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(
request.leaseDurationMillis())) {
request.future().complete(null);
existingRequestIterator.remove();
}
}
}
}
......@@ -133,14 +133,12 @@ public class DistributedLockManager implements LockService {
private final int leaseDurationMillis;
private final CompletableFuture<Void> future;
public LockRequest(
Lock lock,
long waitTimeMillis,
int leaseDurationMillis,
CompletableFuture<Void> future) {
public LockRequest(Lock lock, long waitTimeMillis,
int leaseDurationMillis, CompletableFuture<Void> future) {
this.lock = lock;
this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
this.expirationTime = DateTime.now().plusMillis(
(int) waitTimeMillis);
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
......
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.VersionedValue;
/**
* A table modification event.
*/
......@@ -17,41 +19,46 @@ public final class TableModificationEvent {
private final String tableName;
private final String key;
private final VersionedValue value;
private final Type type;
/**
* Creates a new row deleted table modification event.
* @param tableName table name.
* @param key row key
* @param value value associated with the key when it was deleted.
* @return table modification event.
*/
public static TableModificationEvent rowDeleted(String tableName, String key) {
return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
public static TableModificationEvent rowDeleted(String tableName, String key, VersionedValue value) {
return new TableModificationEvent(tableName, key, value, Type.ROW_DELETED);
}
/**
* Creates a new row added table modification event.
* @param tableName table name.
* @param key row key
* @param value value associated with the key
* @return table modification event.
*/
public static TableModificationEvent rowAdded(String tableName, String key) {
return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
public static TableModificationEvent rowAdded(String tableName, String key, VersionedValue value) {
return new TableModificationEvent(tableName, key, value, Type.ROW_ADDED);
}
/**
* Creates a new row updated table modification event.
* @param tableName table name.
* @param key row key
* @param newValue value
* @return table modification event.
*/
public static TableModificationEvent rowUpdated(String tableName, String key) {
return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
public static TableModificationEvent rowUpdated(String tableName, String key, VersionedValue newValue) {
return new TableModificationEvent(tableName, key, newValue, Type.ROW_UPDATED);
}
private TableModificationEvent(String tableName, String key, Type type) {
private TableModificationEvent(String tableName, String key, VersionedValue value, Type type) {
this.tableName = tableName;
this.key = key;
this.value = value;
this.type = type;
}
......@@ -72,6 +79,15 @@ public final class TableModificationEvent {
}
/**
* Returns the value associated with the key. If the event for a deletion, this
* method returns value that was deleted.
* @return row value
*/
public VersionedValue value() {
return value;
}
/**
* Returns the type of table modification event.
* @return event type.
*/
......
......@@ -39,30 +39,23 @@
</dependency>
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat</artifactId>
<version>${copycat.version}</version>
<groupId>net.jodah</groupId>
<artifactId>expiringmap</artifactId>
<version>0.3.1</version>
</dependency>
<!-- Commented out due to Chronicle + OSGi issue
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-chronicle</artifactId>
<artifactId>copycat</artifactId>
<version>${copycat.version}</version>
</dependency>
-->
<dependency>
<groupId>net.kuujo.copycat</groupId>
<artifactId>copycat-tcp</artifactId>
<version>${copycat.version}</version>
</dependency>
<!-- chronicle transitive dependency
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>4.1.0</version>
</dependency>
-->
</dependencies>
<build>
......@@ -89,20 +82,19 @@
</filter>
<filter>
<artifact>net.kuujo.copycat:*</artifact>
<artifact>net.jodah.expiringmap:*</artifact>
<includes>
<include>net/kuujo/copycat/**</include>
<include>net/jodah/expiringmap/**</include>
</includes>
</filter>
<!-- chronicle transitive dependency
<filter>
<artifact>net.java.dev.jna:*</artifact>
<artifact>net.kuujo.copycat:*</artifact>
<includes>
<include>com/sun/jna/**</include>
<include>net/kuujo/copycat/**</include>
</includes>
</filter>
-->
</filters>
</configuration>
<executions>
......@@ -120,7 +112,7 @@
<configuration>
<instructions>
<Export-Package>
com.googlecode.concurrenttrees.*;net.kuujo.copycat.*
com.googlecode.concurrenttrees.*;net.kuujo.copycat.*;net.jodah.expiringmap.*
</Export-Package>
</instructions>
</configuration>
......