Madan Jampani

Support for expiring Database entries

Registering database entry expiration tracker with DatabaseStateMachine

Support for publishing database state machine snapshot installation events.
Expiry tracker will listen to these events to bootstrap its local state.

Change-Id: I8bf22c8d7bab38624341350ccc083c5ca2fcb117
......@@ -20,6 +20,16 @@ public interface DatabaseAdminService {
public boolean createTable(String name);
/**
* Creates a new table where last update time will be used to track and expire old entries.
* Table creation is idempotent. Attempting to create a table
* that already exists will be a noop.
* @param name table name.
* @param ttlMillis total duration in millis since last update time when entries will be expired.
* @return true if the table was created by this call, false otherwise.
*/
public boolean createTable(String name, int ttlMillis);
/**
* Lists all the tables in the database.
* @return list of table names.
*/
......
......@@ -35,6 +35,16 @@ public class DatabaseClient {
}
}
public boolean createTable(String tableName, int ttlMillis) {
CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
public void dropTable(String tableName) {
CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
......
......@@ -30,12 +30,14 @@ import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,23 +54,34 @@ public class DatabaseEntryExpirationTracker implements
public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
"database-update-event");
private DatabaseService databaseService;
private ClusterService cluster;
private ClusterCommunicationService clusterCommunicator;
private final DatabaseService databaseService;
private final ClusterCommunicationService clusterCommunicator;
private final Member localMember;
private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(Member localMember) {
DatabaseEntryExpirationTracker(
Member localMember,
ControllerNode localNode,
ClusterCommunicationService clusterCommunicator,
DatabaseService databaseService) {
this.localMember = localMember;
this.localNode = localNode;
this.clusterCommunicator = clusterCommunicator;
this.databaseService = databaseService;
}
@Override
public void tableModified(TableModificationEvent event) {
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
}
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(event.tableName());
......@@ -77,8 +90,8 @@ public class DatabaseEntryExpirationTracker implements
case ROW_DELETED:
if (isLocalMemberLeader.get()) {
try {
clusterCommunicator.broadcast(new ClusterMessage(cluster
.getLocalNode().id(), DATABASE_UPDATES,
clusterCommunicator.broadcast(new ClusterMessage(
localNode.id(), DATABASE_UPDATES,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
log.error(
......@@ -97,12 +110,10 @@ public class DatabaseEntryExpirationTracker implements
}
@Override
public void tableCreated(String tableName, int expirationTimeMillis) {
// make this explicit instead of relying on a negative value
// to indicate no expiration.
if (expirationTimeMillis > 0) {
tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
.expiration(expirationTimeMillis, TimeUnit.SECONDS)
public void tableCreated(TableMetadata metadata) {
if (metadata.expireOldEntries()) {
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
.expirationListener(expirationObserver)
// FIXME: make the expiration policy configurable.
.expirationPolicy(ExpirationPolicy.CREATED).build());
......@@ -188,4 +199,28 @@ public class DatabaseEntryExpirationTracker implements
return Objects.hash(tableName, key);
}
}
@Override
public void snapshotInstalled(State state) {
if (!tableEntryExpirationMap.isEmpty()) {
return;
}
for (String tableName : state.getTableNames()) {
TableMetadata metadata = state.getTableMetadata(tableName);
if (!metadata.expireOldEntries()) {
continue;
}
Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build();
for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
}
tableEntryExpirationMap.put(tableName, tableExpirationMap);
}
}
}
......
......@@ -14,7 +14,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpCluster;
......@@ -34,6 +33,7 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchReadResult;
import org.onlab.onos.store.service.BatchWriteRequest;
......@@ -65,6 +65,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseProtocolService copycatMessagingProtocol;
// FIXME: point to appropriate path
......@@ -158,7 +161,13 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
log.info("Starting cluster: {}", cluster);
StateMachine stateMachine = new DatabaseStateMachine();
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
stateMachine.addEventListener(
new DatabaseEntryExpirationTracker(
clusterConfig.getLocalMember(),
clusterService.getLocalNode(),
clusterCommunicator,
this));
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
......@@ -183,6 +192,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
@Override
public boolean createTable(String name, int ttlMillis) {
return client.createTable(name, ttlMillis);
}
@Override
public void dropTable(String name) {
client.dropTable(name);
}
......
......@@ -30,9 +30,10 @@ import org.onlab.onos.store.service.WriteStatus;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
/**
......@@ -57,6 +58,7 @@ public class DatabaseStateMachine implements StateMachine {
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
.register(TableMetadata.class)
.register(BatchReadRequest.class)
.register(BatchWriteRequest.class)
.register(ReadStatus.class)
......@@ -69,7 +71,7 @@ public class DatabaseStateMachine implements StateMachine {
}
};
private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
// durable internal state of the database.
private State state = new State();
......@@ -78,34 +80,31 @@ public class DatabaseStateMachine implements StateMachine {
@Command
public boolean createTable(String tableName) {
Map<String, VersionedValue> existingTable =
state.getTables().putIfAbsent(tableName, Maps.newHashMap());
if (existingTable == null) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(tableName, Integer.MAX_VALUE);
TableMetadata metadata = new TableMetadata(tableName);
return createTable(metadata);
}
return true;
@Command
public boolean createTable(String tableName, int ttlMillis) {
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
private boolean createTable(TableMetadata metadata) {
Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
if (existingTable != null) {
return false;
}
@Command
public boolean createTable(String tableName, int expirationTimeMillis) {
Map<String, VersionedValue> existingTable =
state.getTables().putIfAbsent(tableName, Maps.newHashMap());
if (existingTable == null) {
state.createTable(metadata);
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(tableName, expirationTimeMillis);
listener.tableCreated(metadata);
}
return true;
}
return false;
}
@Command
public boolean dropTable(String tableName) {
Map<String, VersionedValue> table = state.getTables().remove(tableName);
if (table != null) {
if (state.removeTable(tableName)) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableDeleted(tableName);
}
......@@ -116,8 +115,8 @@ public class DatabaseStateMachine implements StateMachine {
@Command
public boolean dropAllTables() {
Set<String> tableNames = state.getTables().keySet();
state.getTables().clear();
Set<String> tableNames = state.getTableNames();
state.removeAllTables();
for (DatabaseUpdateEventListener listener : listeners) {
for (String tableName : tableNames) {
listener.tableDeleted(tableName);
......@@ -127,15 +126,15 @@ public class DatabaseStateMachine implements StateMachine {
}
@Query
public List<String> listTables() {
return ImmutableList.copyOf(state.getTables().keySet());
public Set<String> listTables() {
return ImmutableSet.copyOf(state.getTableNames());
}
@Query
public List<ReadResult> read(BatchReadRequest batchRequest) {
List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
for (ReadRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Map<String, VersionedValue> table = state.getTable(request.tableName());
if (table == null) {
results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
continue;
......@@ -186,7 +185,7 @@ public class DatabaseStateMachine implements StateMachine {
boolean abort = false;
List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Map<String, VersionedValue> table = state.getTable(request.tableName());
if (table == null) {
validationResults.add(WriteStatus.NO_SUCH_TABLE);
abort = true;
......@@ -218,7 +217,7 @@ public class DatabaseStateMachine implements StateMachine {
// apply changes
for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Map<String, VersionedValue> table = state.getTable(request.tableName());
TableModificationEvent tableModificationEvent = null;
// FIXME: If this method could be called by multiple thread,
......@@ -274,19 +273,78 @@ public class DatabaseStateMachine implements StateMachine {
return results;
}
public class State {
public static class State {
private final Map<String, Map<String, VersionedValue>> tables =
Maps.newHashMap();
private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
private long versionCounter = 1;
Map<String, Map<String, VersionedValue>> getTables() {
return tables;
public Map<String, VersionedValue> getTable(String tableName) {
return tableData.get(tableName);
}
void createTable(TableMetadata metadata) {
tableMetadata.put(metadata.tableName, metadata);
tableData.put(metadata.tableName, Maps.newHashMap());
}
TableMetadata getTableMetadata(String tableName) {
return tableMetadata.get(tableName);
}
long nextVersion() {
return versionCounter++;
}
Set<String> getTableNames() {
return ImmutableSet.copyOf(tableMetadata.keySet());
}
boolean removeTable(String tableName) {
if (!tableMetadata.containsKey(tableName)) {
return false;
}
tableMetadata.remove(tableName);
tableData.remove(tableName);
return true;
}
void removeAllTables() {
tableMetadata.clear();
tableData.clear();
}
}
public static class TableMetadata {
private final String tableName;
private final boolean expireOldEntries;
private final int ttlMillis;
public TableMetadata(String tableName) {
this.tableName = tableName;
this.expireOldEntries = false;
this.ttlMillis = Integer.MAX_VALUE;
}
public TableMetadata(String tableName, int ttlMillis) {
this.tableName = tableName;
this.expireOldEntries = true;
this.ttlMillis = ttlMillis;
}
public String tableName() {
return tableName;
}
public boolean expireOldEntries() {
return expireOldEntries;
}
public int ttlMillis() {
return ttlMillis;
}
}
@Override
......@@ -319,13 +377,30 @@ public class DatabaseStateMachine implements StateMachine {
} else {
this.state = SERIALIZER.decode(data);
}
// FIXME: synchronize.
for (DatabaseUpdateEventListener listener : listeners) {
listener.snapshotInstalled(state);
}
} catch (Exception e) {
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
}
}
/**
* Adds specified DatabaseUpdateEventListener.
* @param listener listener to add
*/
public void addEventListener(DatabaseUpdateEventListener listener) {
listeners.add(listener);
}
/**
* Removes specified DatabaseUpdateEventListener.
* @param listener listener to remove
*/
public void removeEventListener(DatabaseUpdateEventListener listener) {
listeners.remove(listener);
}
}
......
......@@ -16,6 +16,8 @@
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
/**
* Interface of database update event listeners.
*/
......@@ -29,14 +31,19 @@ public interface DatabaseUpdateEventListener {
/**
* Notifies listeners of a table created event.
* @param tableName name of the table created
* @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
* @param metadata metadata for the created table.
*/
public void tableCreated(String tableName, int expirationTimeMillis);
public void tableCreated(TableMetadata metadata);
/**
* Notifies listeners of a table deleted event.
* @param tableName name of the table deleted
*/
public void tableDeleted(String tableName);
/**
* Notifies listeners of a snapshot installation event.
* @param snapshotState installed snapshot state.
*/
public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
}
......