Yuta Higuchi
Committed by Gerrit Code Review

Merge "Support for expiring Database entries"

......@@ -20,6 +20,16 @@ public interface DatabaseAdminService {
public boolean createTable(String name);
/**
* Creates a new table where last update time will be used to track and expire old entries.
* Table creation is idempotent. Attempting to create a table
* that already exists will be a noop.
* @param name table name.
* @param ttlMillis total duration in millis since last update time when entries will be expired.
* @return true if the table was created by this call, false otherwise.
*/
public boolean createTable(String name, int ttlMillis);
/**
* Lists all the tables in the database.
* @return list of table names.
*/
......
......@@ -35,6 +35,16 @@ public class DatabaseClient {
}
}
public boolean createTable(String tableName, int ttlMillis) {
CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new DatabaseException(e);
}
}
public void dropTable(String tableName) {
CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
......
......@@ -30,12 +30,14 @@ import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.VersionedValue;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,23 +54,34 @@ public class DatabaseEntryExpirationTracker implements
public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
"database-update-event");
private DatabaseService databaseService;
private ClusterService cluster;
private ClusterCommunicationService clusterCommunicator;
private final DatabaseService databaseService;
private final ClusterCommunicationService clusterCommunicator;
private final Member localMember;
private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(Member localMember) {
DatabaseEntryExpirationTracker(
Member localMember,
ControllerNode localNode,
ClusterCommunicationService clusterCommunicator,
DatabaseService databaseService) {
this.localMember = localMember;
this.localNode = localNode;
this.clusterCommunicator = clusterCommunicator;
this.databaseService = databaseService;
}
@Override
public void tableModified(TableModificationEvent event) {
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
}
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
.get(event.tableName());
......@@ -77,8 +90,8 @@ public class DatabaseEntryExpirationTracker implements
case ROW_DELETED:
if (isLocalMemberLeader.get()) {
try {
clusterCommunicator.broadcast(new ClusterMessage(cluster
.getLocalNode().id(), DATABASE_UPDATES,
clusterCommunicator.broadcast(new ClusterMessage(
localNode.id(), DATABASE_UPDATES,
DatabaseStateMachine.SERIALIZER.encode(event)));
} catch (IOException e) {
log.error(
......@@ -97,12 +110,10 @@ public class DatabaseEntryExpirationTracker implements
}
@Override
public void tableCreated(String tableName, int expirationTimeMillis) {
// make this explicit instead of relying on a negative value
// to indicate no expiration.
if (expirationTimeMillis > 0) {
tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
.expiration(expirationTimeMillis, TimeUnit.SECONDS)
public void tableCreated(TableMetadata metadata) {
if (metadata.expireOldEntries()) {
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
.expirationListener(expirationObserver)
// FIXME: make the expiration policy configurable.
.expirationPolicy(ExpirationPolicy.CREATED).build());
......@@ -188,4 +199,28 @@ public class DatabaseEntryExpirationTracker implements
return Objects.hash(tableName, key);
}
}
@Override
public void snapshotInstalled(State state) {
if (!tableEntryExpirationMap.isEmpty()) {
return;
}
for (String tableName : state.getTableNames()) {
TableMetadata metadata = state.getTableMetadata(tableName);
if (!metadata.expireOldEntries()) {
continue;
}
Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build();
for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
}
tableEntryExpirationMap.put(tableName, tableExpirationMap);
}
}
}
......
......@@ -14,7 +14,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import net.kuujo.copycat.Copycat;
import net.kuujo.copycat.StateMachine;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpCluster;
......@@ -34,6 +33,7 @@ import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.service.BatchReadRequest;
import org.onlab.onos.store.service.BatchReadResult;
import org.onlab.onos.store.service.BatchWriteRequest;
......@@ -65,6 +65,9 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DatabaseProtocolService copycatMessagingProtocol;
// FIXME: point to appropriate path
......@@ -158,7 +161,13 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
log.info("Starting cluster: {}", cluster);
StateMachine stateMachine = new DatabaseStateMachine();
DatabaseStateMachine stateMachine = new DatabaseStateMachine();
stateMachine.addEventListener(
new DatabaseEntryExpirationTracker(
clusterConfig.getLocalMember(),
clusterService.getLocalNode(),
clusterCommunicator,
this));
Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
ClusterMessagingProtocol.SERIALIZER);
......@@ -183,6 +192,11 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
@Override
public boolean createTable(String name, int ttlMillis) {
return client.createTable(name, ttlMillis);
}
@Override
public void dropTable(String name) {
client.dropTable(name);
}
......@@ -418,4 +432,4 @@ public class DatabaseManager implements DatabaseService, DatabaseAdminService {
}
return null;
}
}
\ No newline at end of file
}
......
......@@ -30,9 +30,10 @@ import org.onlab.onos.store.service.WriteStatus;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
/**
......@@ -57,6 +58,7 @@ public class DatabaseStateMachine implements StateMachine {
serializerPool = KryoNamespace.newBuilder()
.register(VersionedValue.class)
.register(State.class)
.register(TableMetadata.class)
.register(BatchReadRequest.class)
.register(BatchWriteRequest.class)
.register(ReadStatus.class)
......@@ -69,7 +71,7 @@ public class DatabaseStateMachine implements StateMachine {
}
};
private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
// durable internal state of the database.
private State state = new State();
......@@ -78,34 +80,31 @@ public class DatabaseStateMachine implements StateMachine {
@Command
public boolean createTable(String tableName) {
Map<String, VersionedValue> existingTable =
state.getTables().putIfAbsent(tableName, Maps.newHashMap());
if (existingTable == null) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(tableName, Integer.MAX_VALUE);
}
return true;
}
return false;
TableMetadata metadata = new TableMetadata(tableName);
return createTable(metadata);
}
@Command
public boolean createTable(String tableName, int expirationTimeMillis) {
Map<String, VersionedValue> existingTable =
state.getTables().putIfAbsent(tableName, Maps.newHashMap());
if (existingTable == null) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(tableName, expirationTimeMillis);
}
return true;
public boolean createTable(String tableName, int ttlMillis) {
TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
return createTable(metadata);
}
private boolean createTable(TableMetadata metadata) {
Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
if (existingTable != null) {
return false;
}
return false;
state.createTable(metadata);
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableCreated(metadata);
}
return true;
}
@Command
public boolean dropTable(String tableName) {
Map<String, VersionedValue> table = state.getTables().remove(tableName);
if (table != null) {
if (state.removeTable(tableName)) {
for (DatabaseUpdateEventListener listener : listeners) {
listener.tableDeleted(tableName);
}
......@@ -116,8 +115,8 @@ public class DatabaseStateMachine implements StateMachine {
@Command
public boolean dropAllTables() {
Set<String> tableNames = state.getTables().keySet();
state.getTables().clear();
Set<String> tableNames = state.getTableNames();
state.removeAllTables();
for (DatabaseUpdateEventListener listener : listeners) {
for (String tableName : tableNames) {
listener.tableDeleted(tableName);
......@@ -127,15 +126,15 @@ public class DatabaseStateMachine implements StateMachine {
}
@Query
public List<String> listTables() {
return ImmutableList.copyOf(state.getTables().keySet());
public Set<String> listTables() {
return ImmutableSet.copyOf(state.getTableNames());
}
@Query
public List<ReadResult> read(BatchReadRequest batchRequest) {
List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
for (ReadRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Map<String, VersionedValue> table = state.getTable(request.tableName());
if (table == null) {
results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
continue;
......@@ -186,7 +185,7 @@ public class DatabaseStateMachine implements StateMachine {
boolean abort = false;
List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Map<String, VersionedValue> table = state.getTable(request.tableName());
if (table == null) {
validationResults.add(WriteStatus.NO_SUCH_TABLE);
abort = true;
......@@ -218,7 +217,7 @@ public class DatabaseStateMachine implements StateMachine {
// apply changes
for (WriteRequest request : batchRequest.getAsList()) {
Map<String, VersionedValue> table = state.getTables().get(request.tableName());
Map<String, VersionedValue> table = state.getTable(request.tableName());
TableModificationEvent tableModificationEvent = null;
// FIXME: If this method could be called by multiple thread,
......@@ -274,19 +273,78 @@ public class DatabaseStateMachine implements StateMachine {
return results;
}
public class State {
public static class State {
private final Map<String, Map<String, VersionedValue>> tables =
Maps.newHashMap();
private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
private long versionCounter = 1;
Map<String, Map<String, VersionedValue>> getTables() {
return tables;
public Map<String, VersionedValue> getTable(String tableName) {
return tableData.get(tableName);
}
void createTable(TableMetadata metadata) {
tableMetadata.put(metadata.tableName, metadata);
tableData.put(metadata.tableName, Maps.newHashMap());
}
TableMetadata getTableMetadata(String tableName) {
return tableMetadata.get(tableName);
}
long nextVersion() {
return versionCounter++;
}
Set<String> getTableNames() {
return ImmutableSet.copyOf(tableMetadata.keySet());
}
boolean removeTable(String tableName) {
if (!tableMetadata.containsKey(tableName)) {
return false;
}
tableMetadata.remove(tableName);
tableData.remove(tableName);
return true;
}
void removeAllTables() {
tableMetadata.clear();
tableData.clear();
}
}
public static class TableMetadata {
private final String tableName;
private final boolean expireOldEntries;
private final int ttlMillis;
public TableMetadata(String tableName) {
this.tableName = tableName;
this.expireOldEntries = false;
this.ttlMillis = Integer.MAX_VALUE;
}
public TableMetadata(String tableName, int ttlMillis) {
this.tableName = tableName;
this.expireOldEntries = true;
this.ttlMillis = ttlMillis;
}
public String tableName() {
return tableName;
}
public boolean expireOldEntries() {
return expireOldEntries;
}
public int ttlMillis() {
return ttlMillis;
}
}
@Override
......@@ -319,13 +377,30 @@ public class DatabaseStateMachine implements StateMachine {
} else {
this.state = SERIALIZER.decode(data);
}
// FIXME: synchronize.
for (DatabaseUpdateEventListener listener : listeners) {
listener.snapshotInstalled(state);
}
} catch (Exception e) {
log.error("Failed to install from snapshot", e);
throw new SnapshotException(e);
}
}
/**
* Adds specified DatabaseUpdateEventListener.
* @param listener listener to add
*/
public void addEventListener(DatabaseUpdateEventListener listener) {
listeners.add(listener);
}
/**
* Removes specified DatabaseUpdateEventListener.
* @param listener listener to remove
*/
public void removeEventListener(DatabaseUpdateEventListener listener) {
listeners.remove(listener);
}
}
......
......@@ -16,6 +16,8 @@
package org.onlab.onos.store.service.impl;
import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
/**
* Interface of database update event listeners.
*/
......@@ -29,14 +31,19 @@ public interface DatabaseUpdateEventListener {
/**
* Notifies listeners of a table created event.
* @param tableName name of the table created
* @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
* @param metadata metadata for the created table.
*/
public void tableCreated(String tableName, int expirationTimeMillis);
public void tableCreated(TableMetadata metadata);
/**
* Notifies listeners of a table deleted event.
* @param tableName name of the table deleted
*/
public void tableDeleted(String tableName);
}
\ No newline at end of file
/**
* Notifies listeners of a snapshot installation event.
* @param snapshotState installed snapshot state.
*/
public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
}
......