Madan Jampani
Committed by Jonathan Hart

WIP: Partitioned Database based on Raft.

Removed the implementation based on previous Copycat API.

Change-Id: I6b9d67e943e17095f585ae2a2cb6304c248cd686
Showing 35 changed files with 1364 additions and 2122 deletions
......@@ -39,7 +39,6 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Lock;
import org.onosproject.store.service.LockService;
import org.onosproject.store.service.impl.DistributedLockManager;
import org.slf4j.Logger;
import java.util.Map;
......@@ -62,9 +61,7 @@ public class LeadershipManager implements LeadershipService {
private final Logger log = getLogger(getClass());
// TODO: Remove this dependency
private static final int TERM_DURATION_MS =
DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
private static final int TERM_DURATION_MS = 2000;
// Time to wait before retrying leadership after
// a unexpected error.
......
package org.onosproject.store.consistent.impl;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.resource.Resource;
/**
* Database.
*/
public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
/**
* Creates a new database with the default cluster configuration.<p>
*
* The database will be constructed with the default cluster configuration. The default cluster configuration
* searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
* options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
*
* Additionally, the database will be constructed with an database configuration that searches the classpath for
* three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
*
* @param name The database name.
* @return The database.
*/
static Database create(String name) {
return create(name, new ClusterConfig(), new DatabaseConfig());
}
/**
* Creates a new database.<p>
*
* The database will be constructed with an database configuration that searches the classpath for
* three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
* {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
* as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
* configurations will be loaded according to namespaces as well; for example, `databases.conf`.
*
* @param name The database name.
* @param cluster The cluster configuration.
* @return The database.
*/
static Database create(String name, ClusterConfig cluster) {
return create(name, cluster, new DatabaseConfig());
}
/**
* Creates a new database.
*
* @param name The database name.
* @param cluster The cluster configuration.
* @param config The database configuration.
* @return The database.
*/
static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
ClusterCoordinator coordinator =
new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
return coordinator.<Database>getResource(name, config.resolve(cluster))
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}
}
package org.onosproject.store.consistent.impl;
import com.typesafe.config.ConfigValueFactory;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.resource.ResourceConfig;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.util.internal.Assert;
import java.util.Map;
/**
* Database configuration.
*
*/
public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
private static final String DATABASE_CONSISTENCY = "consistency";
private static final String DEFAULT_CONFIGURATION = "database-defaults";
private static final String CONFIGURATION = "database";
public DatabaseConfig() {
super(CONFIGURATION, DEFAULT_CONFIGURATION);
}
public DatabaseConfig(Map<String, Object> config) {
super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
}
public DatabaseConfig(String resource) {
super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
}
protected DatabaseConfig(DatabaseConfig config) {
super(config);
}
@Override
public DatabaseConfig copy() {
return new DatabaseConfig(this);
}
/**
* Sets the database read consistency.
*
* @param consistency The database read consistency.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public void setConsistency(String consistency) {
this.config = config.withValue(DATABASE_CONSISTENCY,
ConfigValueFactory.fromAnyRef(
Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
}
/**
* Sets the database read consistency.
*
* @param consistency The database read consistency.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public void setConsistency(Consistency consistency) {
this.config = config.withValue(DATABASE_CONSISTENCY,
ConfigValueFactory.fromAnyRef(
Assert.isNotNull(consistency, "consistency").toString()));
}
/**
* Returns the database read consistency.
*
* @return The database read consistency.
*/
public Consistency getConsistency() {
return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
}
/**
* Sets the database read consistency, returning the configuration for method chaining.
*
* @param consistency The database read consistency.
* @return The database configuration.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public DatabaseConfig withConsistency(String consistency) {
setConsistency(consistency);
return this;
}
/**
* Sets the database read consistency, returning the configuration for method chaining.
*
* @param consistency The database read consistency.
* @return The database configuration.
* @throws java.lang.NullPointerException If the consistency is {@code null}
*/
public DatabaseConfig withConsistency(Consistency consistency) {
setConsistency(consistency);
return this;
}
@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
return new StateLogConfig(toMap())
.resolve(cluster)
.withResourceType(DefaultDatabase.class);
}
}
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* Database proxy.
*/
public interface DatabaseProxy<K, V> {
/**
* Gets the table size.
*
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Integer> size(String tableName);
/**
* Checks whether the table is empty.
*
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> isEmpty(String tableName);
/**
* Checks whether the table contains a key.
*
* @param tableName table name
* @param key The key to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> containsKey(String tableName, K key);
/**
* Checks whether the table contains a value.
*
* @param tableName table name
* @param value The value to check.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> containsValue(String tableName, V value);
/**
* Gets a value from the table.
*
* @param tableName table name
* @param key The key to get.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> get(String tableName, K key);
/**
* Puts a value in the table.
*
* @param tableName table name
* @param key The key to set.
* @param value The value to set.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
/**
* Removes a value from the table.
*
* @param tableName table name
* @param key The key to remove.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> remove(String tableName, K key);
/**
* Clears the table.
*
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Void> clear(String tableName);
/**
* Gets a set of keys in the table.
*
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Set<K>> keySet(String tableName);
/**
* Gets a collection of values in the table.
*
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Collection<Versioned<V>>> values(String tableName);
/**
* Gets a set of entries in the table.
*
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet(String tableName);
/**
* Puts a value in the table if the given key does not exist.
*
* @param tableName table name
* @param key The key to set.
* @param value The value to set if the given key does not exist.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
/**
* Removes a key and if the existing value for that key matches the specified value.
*
* @param tableName table name
* @param key The key to remove.
* @param value The value to remove.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> remove(String tableName, K key, V value);
/**
* Removes a key and if the existing version for that key matches the specified version.
*
* @param tableName table name
* @param key The key to remove.
* @param version The expected version.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> remove(String tableName, K key, long version);
/**
* Replaces the entry for the specified key only if currently mapped to the specified value.
*
* @param tableName table name
* @param key The key to replace.
* @param oldValue The value to replace.
* @param newValue The value with which to replace the given key and value.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
/**
* Replaces the entry for the specified key only if currently mapped to the specified version.
*
* @param tableName table name
* @param key The key to update
* @param oldVersion existing version in the map for this replace to succeed.
* @param newValue The value with which to replace the given key and version.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
/**
* Perform a atomic batch update operation i.e. either all operations in batch succeed or
* none do and no state changes are made.
*
* @param updates list of updates to apply atomically.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
}
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
import net.kuujo.copycat.state.StateContext;
/**
* Database state.
*
*/
public interface DatabaseState<K, V> {
/**
* Initializes the database state.
*
* @param context The map state context.
*/
@Initializer
public void init(StateContext<DatabaseState<K, V>> context);
@Query
int size(String tableName);
@Query
boolean isEmpty(String tableName);
@Query
boolean containsKey(String tableName, K key);
@Query
boolean containsValue(String tableName, V value);
@Query
Versioned<V> get(String tableName, K key);
@Command
Versioned<V> put(String tableName, K key, V value);
@Command
Versioned<V> remove(String tableName, K key);
@Command
void clear(String tableName);
@Query
Set<K> keySet(String tableName);
@Query
Collection<Versioned<V>> values(String tableName);
@Query
Set<Entry<K, Versioned<V>>> entrySet(String tableName);
@Command
Versioned<V> putIfAbsent(String tableName, K key, V value);
@Command
boolean remove(String tableName, K key, V value);
@Command
boolean remove(String tableName, K key, long version);
@Command
boolean replace(String tableName, K key, V oldValue, V newValue);
@Command
boolean replace(String tableName, K key, long oldVersion, V newValue);
@Command
boolean batchUpdate(List<UpdateOperation<K, V>> updates);
}
package org.onosproject.store.consistent.impl;
import net.kuujo.copycat.resource.internal.ResourceContext;
import net.kuujo.copycat.state.StateMachine;
import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
/**
* Default database.
*/
public class DefaultDatabase extends AbstractResource<Database> implements Database {
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
@SuppressWarnings("unchecked")
public DefaultDatabase(ResourceContext context) {
super(context);
this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
}
/**
* If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
* return the completed future result.
*
* @param supplier The supplier to call if the database is open.
* @param <T> The future result type.
* @return A completable future that if this database is closed is immediately failed.
*/
protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
if (proxy == null) {
return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
}
return supplier.get();
}
@Override
public CompletableFuture<Integer> size(String tableName) {
return checkOpen(() -> proxy.size(tableName));
}
@Override
public CompletableFuture<Boolean> isEmpty(String tableName) {
return checkOpen(() -> proxy.isEmpty(tableName));
}
@Override
public CompletableFuture<Boolean> containsKey(String tableName, String key) {
return checkOpen(() -> proxy.containsKey(tableName, key));
}
@Override
public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
return checkOpen(() -> proxy.containsValue(tableName, value));
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
return checkOpen(() -> proxy.get(tableName, key));
}
@Override
public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.put(tableName, key, value));
}
@Override
public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
return checkOpen(() -> proxy.remove(tableName, key));
}
@Override
public CompletableFuture<Void> clear(String tableName) {
return checkOpen(() -> proxy.clear(tableName));
}
@Override
public CompletableFuture<Set<String>> keySet(String tableName) {
return checkOpen(() -> proxy.keySet(tableName));
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
return checkOpen(() -> proxy.values(tableName));
}
@Override
public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
return checkOpen(() -> proxy.entrySet(tableName));
}
@Override
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.remove(tableName, key, value));
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
return checkOpen(() -> proxy.remove(tableName, key, version));
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
}
@Override
public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
return checkOpen(() -> proxy.atomicBatchUpdate(updates));
}
@Override
@SuppressWarnings("unchecked")
public synchronized CompletableFuture<Database> open() {
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(DatabaseProxy.class);
})
.thenApply(v -> null);
}
@Override
public synchronized CompletableFuture<Void> close() {
proxy = null;
return stateMachine.close()
.thenCompose(v -> runShutdownTasks());
}
}
package org.onosproject.store.consistent.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
/**
* Default database state.
*
* @param <K> key type
* @param <V> value type
*/
public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
private Long nextVersion;
private Map<String, Map<K, Versioned<V>>> tables;
@Initializer
@Override
public void init(StateContext<DatabaseState<K, V>> context) {
tables = context.get("tables");
if (tables == null) {
tables = new HashMap<>();
context.put("tables", tables);
}
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
context.put("nextVersion", nextVersion);
}
}
private Map<K, Versioned<V>> getTableMap(String tableName) {
Map<K, Versioned<V>> table = tables.get(tableName);
if (table == null) {
table = new HashMap<>();
tables.put(tableName, table);
}
return table;
}
@Override
public int size(String tableName) {
return getTableMap(tableName).size();
}
@Override
public boolean isEmpty(String tableName) {
return getTableMap(tableName).isEmpty();
}
@Override
public boolean containsKey(String tableName, K key) {
return getTableMap(tableName).containsKey(key);
}
@Override
public boolean containsValue(String tableName, V value) {
return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
}
@Override
public Versioned<V> get(String tableName, K key) {
return getTableMap(tableName).get(key);
}
@Override
public Versioned<V> put(String tableName, K key, V value) {
return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
}
@Override
public Versioned<V> remove(String tableName, K key) {
return getTableMap(tableName).remove(key);
}
@Override
public void clear(String tableName) {
getTableMap(tableName).clear();
}
@Override
public Set<K> keySet(String tableName) {
return getTableMap(tableName).keySet();
}
@Override
public Collection<Versioned<V>> values(String tableName) {
return getTableMap(tableName).values();
}
@Override
public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
return getTableMap(tableName).entrySet();
}
@Override
public Versioned<V> putIfAbsent(String tableName, K key, V value) {
Versioned<V> existingValue = getTableMap(tableName).get(key);
return existingValue != null ? existingValue : put(tableName, key, value);
}
@Override
public boolean remove(String tableName, K key, V value) {
Versioned<V> existing = getTableMap(tableName).get(key);
if (existing != null && existing.value().equals(value)) {
getTableMap(tableName).remove(key);
return true;
}
return false;
}
@Override
public boolean remove(String tableName, K key, long version) {
Versioned<V> existing = getTableMap(tableName).get(key);
if (existing != null && existing.version() == version) {
remove(tableName, key);
return true;
}
return false;
}
@Override
public boolean replace(String tableName, K key, V oldValue, V newValue) {
Versioned<V> existing = getTableMap(tableName).get(key);
if (existing != null && existing.value().equals(oldValue)) {
put(tableName, key, newValue);
return true;
}
return false;
}
@Override
public boolean replace(String tableName, K key, long oldVersion, V newValue) {
Versioned<V> existing = getTableMap(tableName).get(key);
if (existing != null && existing.version() == oldVersion) {
put(tableName, key, newValue);
return true;
}
return false;
}
@Override
public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
return false;
} else {
updates.stream().forEach(this::doUpdate);
return true;
}
}
private void doUpdate(UpdateOperation<K, V> update) {
String tableName = update.tableName();
K key = update.key();
switch (update.type()) {
case PUT:
put(tableName, key, update.value());
return;
case REMOVE:
remove(tableName, key);
return;
case PUT_IF_ABSENT:
putIfAbsent(tableName, key, update.value());
return;
case PUT_IF_VERSION_MATCH:
replace(tableName, key, update.currentValue(), update.value());
return;
case PUT_IF_VALUE_MATCH:
replace(tableName, key, update.currentVersion(), update.value());
return;
case REMOVE_IF_VERSION_MATCH:
remove(tableName, key, update.currentVersion());
return;
case REMOVE_IF_VALUE_MATCH:
remove(tableName, key, update.currentValue());
return;
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
}
private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
Versioned<V> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {
case PUT:
case REMOVE:
return true;
case PUT_IF_ABSENT:
return existingEntry == null;
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
return existingEntry != null && existingEntry.value().equals(update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
return existingEntry == null || existingEntry.value().equals(update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
}
private boolean checkEquality(V value1, V value2) {
if (value1 instanceof byte[]) {
return Arrays.equals((byte[]) value1, (byte[]) value2);
}
return value1.equals(value2);
}
}
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
/**
* A database that partitions the keys across one or more database partitions.
*/
public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
private Partitioner<String> partitioner;
private final ClusterCoordinator coordinator;
private final Map<String, Database> partitions = Maps.newConcurrentMap();
protected PartitionedDatabase(ClusterCoordinator coordinator) {
this.coordinator = coordinator;
}
@Override
public void registerPartition(String name, Database partition) {
partitions.put(name, partition);
}
@Override
public Map<String, Database> getRegisteredPartitions() {
return ImmutableMap.copyOf(partitions);
}
@Override
public CompletableFuture<Integer> size(String tableName) {
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
.toArray(CompletableFuture[]::new))
.thenApply(v -> totalSize.get());
}
@Override
public CompletableFuture<Boolean> isEmpty(String tableName) {
return size(tableName).thenApply(size -> size == 0);
}
@Override
public CompletableFuture<Boolean> containsKey(String tableName, String key) {
return partitioner.getPartition(tableName, key).containsKey(tableName, key);
}
@Override
public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
.toArray(CompletableFuture[]::new))
.thenApply(v -> containsValue.get());
}
@Override
public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
return partitioner.getPartition(tableName, key).get(tableName, key);
}
@Override
public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
return partitioner.getPartition(tableName, key).put(tableName, key, value);
}
@Override
public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
return partitioner.getPartition(tableName, key).remove(tableName, key);
}
@Override
public CompletableFuture<Void> clear(String tableName) {
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.clear(tableName))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Set<String>> keySet(String tableName) {
Set<String> keySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.keySet(tableName).thenApply(keySet::addAll))
.toArray(CompletableFuture[]::new))
.thenApply(v -> keySet);
}
@Override
public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.values(tableName))
.toArray(CompletableFuture[]::new))
.thenApply(v -> values);
}
@Override
public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
.toArray(CompletableFuture[]::new))
.thenApply(v -> entrySet);
}
@Override
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
return partitioner.getPartition(tableName, key).remove(tableName, key, value);
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
return partitioner.getPartition(tableName, key).remove(tableName, key, version);
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
}
@Override
public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
for (UpdateOperation<String, byte[]> update : updates) {
Database partition = partitioner.getPartition(update.tableName(), update.key());
List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
if (partitionUpdates == null) {
partitionUpdates = Lists.newArrayList();
perPartitionUpdates.put(partition, partitionUpdates);
}
partitionUpdates.add(update);
}
if (perPartitionUpdates.size() > 1) {
// TODO
throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
} else {
Entry<Database, List<UpdateOperation<String, byte[]>>> only =
perPartitionUpdates.entrySet().iterator().next();
return only.getKey().atomicBatchUpdate(only.getValue());
}
}
@Override
public void setPartitioner(Partitioner<String> partitioner) {
this.partitioner = partitioner;
}
@Override
public CompletableFuture<PartitionedDatabase> open() {
return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
.values()
.stream()
.map(Database::open)
.collect(Collectors.toList())
.toArray(new CompletableFuture[partitions.size()]))
.thenApply(v -> this));
}
@Override
public CompletableFuture<Void> close() {
CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
.values()
.stream()
.map(database -> database.close())
.collect(Collectors.toList())
.toArray(new CompletableFuture[partitions.size()]));
CompletableFuture<Void> closeCoordinator = coordinator.close();
return closePartitions.thenCompose(v -> closeCoordinator);
}
}
package org.onosproject.store.consistent.impl;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Partitioned database configuration.
*/
public class PartitionedDatabaseConfig {
private final Map<String, DatabaseConfig> partitions = new HashMap<>();
/**
* Returns the configuration for all partitions.
* @return partition map to configuartion mapping.
*/
public Map<String, DatabaseConfig> partitions() {
return Collections.unmodifiableMap(partitions);
}
/**
* Adds the specified partition name and configuration.
* @param name partition name.
* @param config partition config
* @return this instance
*/
public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
partitions.put(name, config);
return this;
}
}
package org.onosproject.store.consistent.impl;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
public interface PartitionedDatabaseManager {
/**
* Opens the database.
*
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<PartitionedDatabase> open();
/**
* Closes the database.
*
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Void> close();
/**
* Sets the partitioner to use for mapping keys to partitions.
*
* @param partitioner partitioner
*/
void setPartitioner(Partitioner<String> partitioner);
/**
* Registers a new partition.
*
* @param partitionName partition name.
* @param partition partition.
*/
void registerPartition(String partitionName, Database partition);
/**
* Returns all the registered database partitions.
*
* @return mapping of all registered database partitions.
*/
Map<String, Database> getRegisteredPartitions();
/**
* Creates a new partitioned database.
*
* @param name The database name.
* @param clusterConfig The cluster configuration.
* @param partitionedDatabaseConfig The database configuration.
* @return The database.
*/
public static PartitionedDatabase create(
String name,
ClusterConfig clusterConfig,
PartitionedDatabaseConfig partitionedDatabaseConfig) {
CopycatConfig copycatConfig = new CopycatConfig()
.withName(name)
.withClusterConfig(clusterConfig)
.withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
partitionedDatabase.registerPartition(partitionName ,
coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
.withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
.withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
partitionedDatabase.setPartitioner(
new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
return partitionedDatabase;
}
}
package org.onosproject.store.consistent.impl;
/**
* Partitioner is responsible for mapping keys to individual database partitions.
*
* @param <K> key type.
*/
public interface Partitioner<K> {
/**
* Returns the database partition.
* @param tableName table name
* @param key key
* @return Database partition
*/
Database getPartition(String tableName, K key);
}
package org.onosproject.store.consistent.impl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
/**
* A simple Partitioner that uses the key hashCode to map
* key to a partition.
*
* @param <K> key type.
*/
public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
private final Map<String, Database> partitionMap;
private final List<String> sortedPartitionNames;
public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
this.partitionMap = ImmutableMap.copyOf(partitionMap);
sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
Collections.sort(sortedPartitionNames);
}
@Override
public Database getPartition(String tableName, K key) {
return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
}
}
package org.onosproject.store.consistent.impl;
import com.google.common.base.MoreObjects;
/**
* Database update operation.
*
* @param <K> key type.
* @param <V> value type.
*/
public class UpdateOperation<K, V> {
/**
* Type of database update operation.
*/
public static enum Type {
PUT,
PUT_IF_ABSENT,
PUT_IF_VERSION_MATCH,
PUT_IF_VALUE_MATCH,
REMOVE,
REMOVE_IF_VERSION_MATCH,
REMOVE_IF_VALUE_MATCH,
}
private Type type;
private String tableName;
private K key;
private V value;
private V currentValue;
private long currentVersion;
/**
* Returns the type of update operation.
* @return type of update.
*/
public Type type() {
return type;
}
/**
* Returns the tableName being updated.
* @return table name.
*/
public String tableName() {
return tableName;
}
/**
* Returns the item key being updated.
* @return item key
*/
public K key() {
return key;
}
/**
* Returns the new value.
* @return item's target value.
*/
public V value() {
return value;
}
/**
* Returns the expected current value in the database value for the key.
* @return current value in database.
*/
public V currentValue() {
return currentValue;
}
/**
* Returns the expected current version in the database for the key.
* @return expected version.
*/
public long currentVersion() {
return currentVersion;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("tableName", tableName)
.add("key", key)
.add("value", value)
.add("currentValue", currentValue)
.add("currentVersion", currentVersion)
.toString();
}
/**
* UpdatOperation builder.
*
* @param <K> key type.
* @param <V> value type.
*/
public static final class Builder<K, V> {
private UpdateOperation<K, V> operation = new UpdateOperation<>();
/**
* Creates a new builder instance.
* @param <K> key type.
* @param <V> value type.
*
* @return builder.
*/
public static <K, V> Builder<K, V> builder() {
return new Builder<>();
}
private Builder() {
}
public UpdateOperation<K, V> build() {
return operation;
}
public Builder<K, V> withType(Type type) {
operation.type = type;
return this;
}
public Builder<K, V> withTableName(String tableName) {
operation.tableName = tableName;
return this;
}
public Builder<K, V> withKey(K key) {
operation.key = key;
return this;
}
public Builder<K, V> withCurrentValue(V value) {
operation.currentValue = value;
return this;
}
public Builder<K, V> withValue(V value) {
operation.value = value;
return this;
}
public Builder<K, V> withCurrentVersion(long version) {
operation.currentVersion = version;
return this;
}
}
}
package org.onosproject.store.consistent.impl;
import com.google.common.base.MoreObjects;
/**
* Versioned value.
*
* @param <V> value type.
*/
public class Versioned<V> {
private final V value;
private final long version;
/**
* Constructs a new versioned value.
* @param value value
* @param version version
*/
public Versioned(V value, long version) {
this.value = value;
this.version = version;
}
/**
* Returns the value.
*
* @return value.
*/
public V value() {
return value;
}
/**
* Returns the version.
*
* @return version
*/
public long version() {
return version;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("value", value)
.add("version", version)
.toString();
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Vector;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.internal.log.ConfigurationEntry;
import net.kuujo.copycat.internal.log.CopycatEntry;
import net.kuujo.copycat.internal.log.OperationEntry;
import net.kuujo.copycat.internal.log.SnapshotEntry;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.Response.Status;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.Protocol;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.impl.DatabaseStateMachine.State;
import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
/**
* ONOS Cluster messaging based Copycat protocol.
*/
@Component(immediate = false)
@Service
public class ClusterMessagingProtocol
implements DatabaseProtocolService, Protocol<TcpMember> {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
public static final MessageSubject COPYCAT_PING =
new MessageSubject("copycat-raft-consensus-ping");
public static final MessageSubject COPYCAT_SYNC =
new MessageSubject("copycat-raft-consensus-sync");
public static final MessageSubject COPYCAT_POLL =
new MessageSubject("copycat-raft-consensus-poll");
public static final MessageSubject COPYCAT_SUBMIT =
new MessageSubject("copycat-raft-consensus-submit");
static final int AFTER_COPYCAT = KryoNamespaces.BEGIN_USER_CUSTOM_ID + 50;
static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
.register(PingRequest.class)
.register(PingResponse.class)
.register(PollRequest.class)
.register(PollResponse.class)
.register(SyncRequest.class)
.register(SyncResponse.class)
.register(SubmitRequest.class)
.register(SubmitResponse.class)
.register(Status.class)
.register(ConfigurationEntry.class)
.register(SnapshotEntry.class)
.register(CopycatEntry.class)
.register(OperationEntry.class)
.register(TcpClusterConfig.class)
.register(TcpMember.class)
.register(LeaderElectEvent.class)
.register(Vector.class)
.build();
// serializer used for CopyCat Protocol
public static final StoreSerializer DB_SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(COPYCAT)
.nextId(AFTER_COPYCAT)
// for snapshot
.register(State.class)
.register(TableMetadata.class)
// TODO: Move this out to API?
.register(TableModificationEvent.class)
.register(TableModificationEvent.Type.class)
.build();
}
};
@Activate
public void activate() {
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public ProtocolServer createServer(TcpMember member) {
return new ClusterMessagingProtocolServer(clusterCommunicator);
}
@Override
public ProtocolClient createClient(TcpMember member) {
return new ClusterMessagingProtocolClient(clusterService,
clusterCommunicator,
clusterService.getLocalNode(),
member);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static com.google.common.base.Verify.verifyNotNull;
import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static java.util.concurrent.Executors.newCachedThreadPool;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PingResponse;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.PollResponse;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.protocol.SyncResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.slf4j.Logger;
/**
* ONOS Cluster messaging based Copycat protocol client.
*/
public class ClusterMessagingProtocolClient implements ProtocolClient {
private final Logger log = getLogger(getClass());
public static final Duration RETRY_INTERVAL = Duration.ofMillis(2000);
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final ControllerNode localNode;
private final TcpMember remoteMember;
private ControllerNode remoteNode;
private final AtomicBoolean connectionOK = new AtomicBoolean(true);
private ExecutorService pool;
public ClusterMessagingProtocolClient(
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
ControllerNode localNode,
TcpMember remoteMember) {
this.clusterService = clusterService;
this.clusterCommunicator = clusterCommunicator;
this.localNode = localNode;
this.remoteMember = remoteMember;
}
@Override
public CompletableFuture<PingResponse> ping(PingRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<SyncResponse> sync(SyncRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<PollResponse> poll(PollRequest request) {
return requestReply(request);
}
@Override
public CompletableFuture<SubmitResponse> submit(SubmitRequest request) {
return requestReply(request);
}
@Override
public synchronized CompletableFuture<Void> connect() {
if (pool == null || pool.isShutdown()) {
// TODO include remote name?
pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-client-%d"));
}
return CompletableFuture.completedFuture(null);
}
@Override
public synchronized CompletableFuture<Void> close() {
if (pool != null) {
pool.shutdownNow();
pool = null;
}
return CompletableFuture.completedFuture(null);
}
private <I> MessageSubject messageType(I input) {
Class<?> clazz = input.getClass();
if (clazz.equals(PollRequest.class)) {
return ClusterMessagingProtocol.COPYCAT_POLL;
} else if (clazz.equals(SyncRequest.class)) {
return ClusterMessagingProtocol.COPYCAT_SYNC;
} else if (clazz.equals(SubmitRequest.class)) {
return ClusterMessagingProtocol.COPYCAT_SUBMIT;
} else if (clazz.equals(PingRequest.class)) {
return ClusterMessagingProtocol.COPYCAT_PING;
} else {
throw new IllegalArgumentException("Unknown class " + clazz.getName());
}
}
private <I, O> CompletableFuture<O> requestReply(I request) {
CompletableFuture<O> future = new CompletableFuture<>();
if (pool == null) {
log.info("Attempted to use closed client, connecting now. {}", request);
connect();
}
pool.submit(new RPCTask<I, O>(request, future));
return future;
}
private ControllerNode getControllerNode(TcpMember remoteMember) {
final String host = remoteMember.host();
final int port = remoteMember.port();
for (ControllerNode node : clusterService.getNodes()) {
if (node.ip().toString().equals(host) && node.tcpPort() == port) {
return node;
}
}
return null;
}
private class RPCTask<I, O> implements Runnable {
private final I request;
private final ClusterMessage message;
private final CompletableFuture<O> future;
public RPCTask(I request, CompletableFuture<O> future) {
this.request = request;
this.message =
new ClusterMessage(
localNode.id(),
messageType(request),
verifyNotNull(DB_SERIALIZER.encode(request)));
this.future = future;
}
@Override
public void run() {
try {
if (remoteNode == null) {
remoteNode = getControllerNode(remoteMember);
if (remoteNode == null) {
throw new IOException("Remote node is offline!");
}
}
byte[] response = clusterCommunicator
.sendAndReceive(message, remoteNode.id())
.get(RETRY_INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
if (!connectionOK.getAndSet(true)) {
log.info("Connectivity to {} restored", remoteNode);
}
future.complete(verifyNotNull(DB_SERIALIZER.decode(response)));
} catch (IOException | TimeoutException e) {
if (connectionOK.getAndSet(false)) {
log.warn("Detected connectivity issues with {}. Reason: {}", remoteNode, e.getMessage());
}
log.debug("RPCTask for {} failed.", request, e);
future.completeExceptionally(e);
} catch (ExecutionException e) {
log.warn("RPCTask execution for {} failed: {}", request, e.getMessage());
log.debug("RPCTask execution for {} failed.", request, e);
future.completeExceptionally(e);
} catch (InterruptedException e) {
log.warn("RPCTask for {} was interrupted: {}", request, e.getMessage());
log.debug("RPCTask for {} was interrupted.", request, e);
future.completeExceptionally(e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.warn("RPCTask for {} terribly failed.", request, e);
future.completeExceptionally(e);
}
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.store.service.impl.ClusterMessagingProtocol.*;
import static org.onosproject.store.service.impl.ClusterMessagingProtocol.DB_SERIALIZER;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import net.kuujo.copycat.protocol.PingRequest;
import net.kuujo.copycat.protocol.PollRequest;
import net.kuujo.copycat.protocol.RequestHandler;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SyncRequest;
import net.kuujo.copycat.spi.protocol.ProtocolServer;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.slf4j.Logger;
/**
* ONOS Cluster messaging based Copycat protocol server.
*/
public class ClusterMessagingProtocolServer implements ProtocolServer {
private final Logger log = getLogger(getClass());
private final ClusterCommunicationService clusterCommunicator;
private volatile RequestHandler handler;
private ExecutorService pool;
public ClusterMessagingProtocolServer(ClusterCommunicationService clusterCommunicator) {
this.clusterCommunicator = clusterCommunicator;
}
@Override
public void requestHandler(RequestHandler handler) {
this.handler = handler;
}
@Override
public CompletableFuture<Void> listen() {
if (pool == null || pool.isShutdown()) {
pool = newCachedThreadPool(namedThreads("onos-copycat-netty-messaging-server-%d"));
}
clusterCommunicator.addSubscriber(COPYCAT_PING, new PingHandler());
clusterCommunicator.addSubscriber(COPYCAT_SYNC, new SyncHandler());
clusterCommunicator.addSubscriber(COPYCAT_POLL, new PollHandler());
clusterCommunicator.addSubscriber(COPYCAT_SUBMIT, new SubmitHandler());
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> close() {
clusterCommunicator.removeSubscriber(COPYCAT_PING);
clusterCommunicator.removeSubscriber(COPYCAT_SYNC);
clusterCommunicator.removeSubscriber(COPYCAT_POLL);
clusterCommunicator.removeSubscriber(COPYCAT_SUBMIT);
if (pool != null) {
pool.shutdownNow();
pool = null;
}
return CompletableFuture.completedFuture(null);
}
private final class PingHandler extends CopycatMessageHandler<PingRequest> {
@Override
public void raftHandle(PingRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().ping(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private final class SyncHandler extends CopycatMessageHandler<SyncRequest> {
@Override
public void raftHandle(SyncRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().sync(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private final class PollHandler extends CopycatMessageHandler<PollRequest> {
@Override
public void raftHandle(PollRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().poll(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private final class SubmitHandler extends CopycatMessageHandler<SubmitRequest> {
@Override
public void raftHandle(SubmitRequest request, ClusterMessage message) {
pool.submit(new Runnable() {
@Override
public void run() {
currentHandler().submit(request)
.whenComplete(new PostExecutionTask<>(message));
}
});
}
}
private abstract class CopycatMessageHandler<T> implements ClusterMessageHandler {
public abstract void raftHandle(T request, ClusterMessage message);
@Override
public void handle(ClusterMessage message) {
T request = DB_SERIALIZER.decode(message.payload());
raftHandle(request, message);
}
RequestHandler currentHandler() {
RequestHandler currentHandler = handler;
if (currentHandler == null) {
// there is a slight window of time during state transition,
// where handler becomes null
long sleepMs = 1;
for (int i = 0; i < 10; ++i) {
currentHandler = handler;
if (currentHandler != null) {
break;
}
try {
sleepMs <<= 1;
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
log.error("Interrupted", e);
return handler;
}
}
if (currentHandler == null) {
log.error("There was no handler registered!");
return handler;
}
}
return currentHandler;
}
final class PostExecutionTask<R> implements BiConsumer<R, Throwable> {
private final ClusterMessage message;
public PostExecutionTask(ClusterMessage message) {
this.message = message;
}
@Override
public void accept(R response, Throwable error) {
if (error != null) {
log.error("Processing {} failed.", message.subject(), error);
} else {
try {
log.trace("responding to {}", message.subject());
message.respond(DB_SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed responding with {}", response.getClass().getName(), e);
}
}
}
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.event.LeaderElectEvent;
import net.kuujo.copycat.protocol.Response.Status;
import net.kuujo.copycat.protocol.SubmitRequest;
import net.kuujo.copycat.protocol.SubmitResponse;
import net.kuujo.copycat.spi.protocol.ProtocolClient;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.service.BatchReadRequest;
import org.onosproject.store.service.BatchWriteRequest;
import org.onosproject.store.service.DatabaseException;
import org.onosproject.store.service.ReadResult;
import org.onosproject.store.service.VersionedValue;
import org.onosproject.store.service.WriteResult;
import org.slf4j.Logger;
/**
* Client for interacting with the Copycat Raft cluster.
*/
public class DatabaseClient implements ClusterMessageHandler {
private static final int RETRIES = 5;
private static final int TIMEOUT_MS = 2000;
private final Logger log = getLogger(getClass());
private final DatabaseProtocolService protocol;
private volatile ProtocolClient client = null;
private volatile Member currentLeader = null;
private volatile long currentLeaderTerm = 0;
public DatabaseClient(DatabaseProtocolService protocol) {
this.protocol = checkNotNull(protocol);
}
@Override
public void handle(ClusterMessage message) {
LeaderElectEvent event =
ClusterMessagingProtocol.DB_SERIALIZER.decode(message.payload());
TcpMember newLeader = event.leader();
long newLeaderTerm = event.term();
if (newLeader != null && !newLeader.equals(currentLeader) && newLeaderTerm > currentLeaderTerm) {
log.info("New leader detected. Leader: {}, term: {}", newLeader, newLeaderTerm);
ProtocolClient prevClient = client;
ProtocolClient newClient = protocol.createClient(newLeader);
newClient.connect();
client = newClient;
currentLeader = newLeader;
currentLeaderTerm = newLeaderTerm;
if (prevClient != null) {
prevClient.close();
}
}
}
private String nextRequestId() {
return UUID.randomUUID().toString();
}
public void waitForLeader() {
if (currentLeader != null) {
return;
}
log.info("No leader in cluster, waiting for election.");
try {
while (currentLeader == null) {
Thread.sleep(200);
}
return;
} catch (InterruptedException e) {
log.error("Interrupted while waiting for Leader", e);
Thread.currentThread().interrupt();
}
}
private <T> T submit(String operationName, Object... args) {
waitForLeader();
if (currentLeader == null) {
throw new DatabaseException("Raft cluster does not have a leader.");
}
SubmitRequest request =
new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
CompletableFuture<SubmitResponse> submitResponse = client.submit(request);
log.debug("Sent {} to {}", request, currentLeader);
try {
final SubmitResponse response = submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (response.status() != Status.OK) {
throw new DatabaseException(response.error());
}
return (T) response.result();
} catch (ExecutionException | InterruptedException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
throw new DatabaseException.Timeout(e);
}
}
public boolean createTable(String tableName) {
return submit("createTable", tableName);
}
public boolean createTable(String tableName, int ttlMillis) {
return submit("createTable", tableName, ttlMillis);
}
public void dropTable(String tableName) {
submit("dropTable", tableName);
}
public void dropAllTables() {
submit("dropAllTables");
}
public Set<String> listTables() {
return submit("listTables");
}
public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
return submit("read", batchRequest);
}
public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
return submit("write", batchRequest);
}
public Map<String, VersionedValue> getAll(String tableName) {
return submit("getAll", tableName);
}
Member getCurrentLeader() {
return currentLeader;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import com.google.common.base.MoreObjects;
import net.jodah.expiringmap.ExpiringMap;
import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
import net.kuujo.copycat.cluster.Member;
import net.kuujo.copycat.event.EventHandler;
import net.kuujo.copycat.event.LeaderElectEvent;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.service.DatabaseService;
import org.onosproject.store.service.VersionedValue;
import org.onosproject.store.service.impl.DatabaseStateMachine.State;
import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.onlab.util.Tools.namedThreads;
/**
* Plugs into the database update stream and track the TTL of entries added to
* the database. For tables with pre-configured finite TTL, this class has
* mechanisms for expiring (deleting) old, expired entries from the database.
*/
public class DatabaseEntryExpirationTracker implements
DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
private static final ExecutorService THREAD_POOL =
Executors.newCachedThreadPool(namedThreads("onos-db-stale-entry-expirer-%d"));
private final Logger log = LoggerFactory.getLogger(getClass());
private final DatabaseService databaseService;
private final ClusterCommunicationService clusterCommunicator;
private final Member localMember;
private final ControllerNode localNode;
private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
DatabaseEntryExpirationTracker(
Member localMember,
ControllerNode localNode,
ClusterCommunicationService clusterCommunicator,
DatabaseService databaseService) {
this.localMember = localMember;
this.localNode = localNode;
this.clusterCommunicator = clusterCommunicator;
this.databaseService = databaseService;
}
@Override
public void tableModified(TableModificationEvent event) {
log.debug("{}: Received {}", localNode.id(), event);
if (!tableEntryExpirationMap.containsKey(event.tableName())) {
return;
}
Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Long eventVersion = event.value().version();
switch (event.type()) {
case ROW_DELETED:
map.remove(row, eventVersion);
if (isLocalMemberLeader.get()) {
log.debug("Broadcasting {} to the entire cluster", event);
clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
}
break;
case ROW_ADDED:
case ROW_UPDATED:
// To account for potential reordering of notifications,
// check to make sure we are replacing an old version with a new version
Long currentVersion = map.get(row);
if (currentVersion == null || currentVersion < eventVersion) {
map.put(row, eventVersion);
}
break;
default:
break;
}
}
@Override
public void tableCreated(TableMetadata metadata) {
log.debug("Received a table created event {}", metadata);
if (metadata.expireOldEntries()) {
tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build());
}
}
@Override
public void tableDeleted(String tableName) {
log.debug("Received a table deleted event for table ({})", tableName);
tableEntryExpirationMap.remove(tableName);
}
private class ExpirationObserver implements
ExpirationListener<DatabaseRow, Long> {
@Override
public void expired(DatabaseRow row, Long version) {
THREAD_POOL.submit(new ExpirationTask(row, version));
}
}
private class ExpirationTask implements Runnable {
private final DatabaseRow row;
private final Long version;
public ExpirationTask(DatabaseRow row, Long version) {
this.row = row;
this.version = version;
}
@Override
public void run() {
log.trace("Received an expiration event for {}, version: {}", row, version);
Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
try {
if (isLocalMemberLeader.get()) {
if (!databaseService.removeIfVersionMatches(row.tableName,
row.key, version)) {
log.info("Entry in database was updated right before its expiration.");
} else {
log.debug("Successfully expired old entry with key ({}) from table ({})",
row.key, row.tableName);
}
} else {
// Only the current leader will expire keys from database.
// Everyone else function as standby just in case they need to take over
if (map != null) {
map.putIfAbsent(row, version);
}
}
} catch (Exception e) {
log.warn("Failed to delete entry from the database after ttl "
+ "expiration. Operation will be retried.", e);
map.putIfAbsent(row, version);
}
}
}
@Override
public void handle(LeaderElectEvent event) {
isLocalMemberLeader.set(localMember.equals(event.leader()));
if (isLocalMemberLeader.get()) {
log.info("{} is now the leader of Raft cluster", localNode.id());
}
}
/**
* Wrapper class for a database row identifier.
*/
private class DatabaseRow {
String tableName;
String key;
public DatabaseRow(String tableName, String key) {
this.tableName = tableName;
this.key = key;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("tableName", tableName)
.add("key", key)
.toString();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof DatabaseRow)) {
return false;
}
DatabaseRow that = (DatabaseRow) obj;
return Objects.equals(this.tableName, that.tableName)
&& Objects.equals(this.key, that.key);
}
@Override
public int hashCode() {
return Objects.hash(tableName, key);
}
}
@Override
public void snapshotInstalled(State state) {
if (!tableEntryExpirationMap.isEmpty()) {
return;
}
log.debug("Received a snapshot installed notification");
for (String tableName : state.getTableNames()) {
TableMetadata metadata = state.getTableMetadata(tableName);
if (!metadata.expireOldEntries()) {
continue;
}
Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
.expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
.expirationListener(expirationObserver)
.expirationPolicy(ExpirationPolicy.CREATED).build();
for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
}
tableEntryExpirationMap.put(tableName, tableExpirationMap);
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import net.kuujo.copycat.cluster.TcpMember;
import net.kuujo.copycat.spi.protocol.Protocol;
// interface required for connecting DatabaseManager + ClusterMessagingProtocol
// TODO: Consider changing ClusterMessagingProtocol to non-Service class
public interface DatabaseProtocolService extends Protocol<TcpMember> {
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import org.onosproject.store.service.impl.DatabaseStateMachine.TableMetadata;
/**
* Interface of database update event listeners.
*/
public interface DatabaseUpdateEventListener {
/**
* Notifies listeners of a table modified event.
* @param event table modification event.
*/
public void tableModified(TableModificationEvent event);
/**
* Notifies listeners of a table created event.
* @param metadata metadata for the created table.
*/
public void tableCreated(TableMetadata metadata);
/**
* Notifies listeners of a table deleted event.
* @param tableName name of the table deleted
*/
public void tableDeleted(String tableName);
/**
* Notifies listeners of a snapshot installation event.
* @param snapshotState installed snapshot state.
*/
public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static com.google.common.base.Verify.verify;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.joda.time.DateTime;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.service.DatabaseException;
import org.onosproject.store.service.DatabaseService;
import org.onosproject.store.service.Lock;
import org.onosproject.store.service.VersionedValue;
import org.slf4j.Logger;
/**
* A distributed lock implementation.
*/
public class DistributedLock implements Lock {
private final Logger log = getLogger(getClass());
private final DistributedLockManager lockManager;
private final DatabaseService databaseService;
private final String path;
private DateTime lockExpirationTime;
private AtomicBoolean isLocked = new AtomicBoolean(false);
private volatile long epoch = 0;
private byte[] lockId;
public DistributedLock(
String path,
DatabaseService databaseService,
ClusterService clusterService,
DistributedLockManager lockManager) {
this.path = path;
this.databaseService = databaseService;
this.lockManager = lockManager;
this.lockId =
(UUID.randomUUID().toString() + "::" +
clusterService.getLocalNode().id().toString()).
getBytes(StandardCharsets.UTF_8);
}
@Override
public String path() {
return path;
}
@Override
public void lock(int leaseDurationMillis) throws InterruptedException {
try {
lockAsync(leaseDurationMillis).get();
} catch (ExecutionException e) {
throw new DatabaseException(e);
}
}
@Override
public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
try {
if (isLocked() || tryLock(leaseDurationMillis)) {
return CompletableFuture.<Void>completedFuture(null);
}
return lockManager.lockIfAvailable(this, leaseDurationMillis);
} catch (DatabaseException e) {
CompletableFuture<Void> lockFuture = new CompletableFuture<>();
lockFuture.completeExceptionally(e);
return lockFuture;
}
}
@Override
public boolean tryLock(int leaseDurationMillis) {
if (databaseService.putIfAbsent(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId)) {
VersionedValue vv =
databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
verify(Arrays.equals(vv.value(), lockId));
epoch = vv.version();
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
}
return false;
}
@Override
public boolean tryLock(
int waitTimeMillis,
int leaseDurationMillis) throws InterruptedException {
if (isLocked() || tryLock(leaseDurationMillis)) {
return true;
}
CompletableFuture<Void> future =
lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
try {
future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
return true;
} catch (ExecutionException e) {
throw new DatabaseException(e);
} catch (TimeoutException e) {
log.debug("Timed out waiting to acquire lock for {}", path);
return false;
}
}
@Override
public boolean isLocked() {
if (isLocked.get()) {
// We rely on local information to check
// if the lock expired.
// This should should make this call
// light weight, while still retaining the
// safety guarantees.
if (DateTime.now().isAfter(lockExpirationTime)) {
isLocked.set(false);
return false;
} else {
return true;
}
}
return false;
}
@Override
public long epoch() {
return epoch;
}
@Override
public void unlock() {
if (!isLocked()) {
return;
} else {
if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
isLocked.set(false);
}
}
}
@Override
public boolean extendExpiration(int leaseDurationMillis) {
if (!isLocked()) {
log.warn("Ignoring request to extend expiration for lock {}."
+ " ExtendExpiration must be called for locks that are already acquired.", path);
return false;
}
if (databaseService.putIfValueMatches(
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId,
lockId)) {
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
return true;
} else {
log.info("Failed to extend expiration for {}", path);
return false;
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
import org.onosproject.cluster.ClusterService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.service.DatabaseAdminService;
import org.onosproject.store.service.DatabaseException;
import org.onosproject.store.service.DatabaseService;
import org.onosproject.store.service.Lock;
import org.onosproject.store.service.LockEventListener;
import org.onosproject.store.service.LockService;
import org.slf4j.Logger;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
@Component(immediate = false)
@Service
public class DistributedLockManager implements LockService {
private static final ExecutorService THREAD_POOL =
Executors.newCachedThreadPool(namedThreads("onos-lock-manager-%d"));
private final Logger log = getLogger(getClass());
public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
public static final int DEAD_LOCK_TIMEOUT_MS = 5000;
private final ListMultimap<String, LockRequest> locksToAcquire =
Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DatabaseAdminService databaseAdminService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private DatabaseService databaseService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@Activate
public void activate() {
try {
Set<String> tables = databaseAdminService.listTables();
if (!tables.contains(ONOS_LOCK_TABLE_NAME)) {
if (databaseAdminService.createTable(ONOS_LOCK_TABLE_NAME, DEAD_LOCK_TIMEOUT_MS)) {
log.info("Created {} table.", ONOS_LOCK_TABLE_NAME);
}
}
} catch (DatabaseException e) {
log.error("DistributedLockManager#activate failed.", e);
}
clusterCommunicator.addSubscriber(
DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
new LockEventMessageListener());
log.info("Started");
}
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
locksToAcquire.clear();
log.info("Stopped.");
}
@Override
public Lock create(String path) {
return new DistributedLock(path, databaseService, clusterService, this);
}
@Override
public void addListener(LockEventListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void removeListener(LockEventListener listener) {
throw new UnsupportedOperationException();
}
/**
* Attempts to acquire the lock as soon as it becomes available.
* @param lock lock to acquire.
* @param waitTimeMillis maximum time to wait before giving up.
* @param leaseDurationMillis the duration for which to acquire the lock initially.
* @return Future that can be blocked on until lock becomes available.
*/
protected CompletableFuture<Void> lockIfAvailable(
Lock lock,
int waitTimeMillis,
int leaseDurationMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
LockRequest request = new LockRequest(
lock,
leaseDurationMillis,
DateTime.now().plusMillis(waitTimeMillis),
future);
locksToAcquire.put(lock.path(), request);
return future;
}
/**
* Attempts to acquire the lock as soon as it becomes available.
* @param lock lock to acquire.
* @param leaseDurationMillis the duration for which to acquire the lock initially.
* @return Future lease expiration date.
*/
protected CompletableFuture<Void> lockIfAvailable(
Lock lock,
int leaseDurationMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
LockRequest request = new LockRequest(
lock,
leaseDurationMillis,
DateTime.now().plusYears(100),
future);
locksToAcquire.put(lock.path(), request);
return future;
}
private class LockEventMessageListener implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
TableModificationEvent event = ClusterMessagingProtocol.DB_SERIALIZER
.decode(message.payload());
if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
THREAD_POOL.submit(new RetryLockTask(event.key()));
}
}
}
private class RetryLockTask implements Runnable {
private final String path;
public RetryLockTask(String path) {
this.path = path;
}
@Override
public void run() {
if (!locksToAcquire.containsKey(path)) {
return;
}
List<LockRequest> existingRequests = locksToAcquire.get(path);
if (existingRequests == null || existingRequests.isEmpty()) {
return;
}
log.info("Path {} is now available for locking. There are {} outstanding "
+ "requests for it.",
path, existingRequests.size());
synchronized (existingRequests) {
Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
while (existingRequestIterator.hasNext()) {
LockRequest request = existingRequestIterator.next();
if (DateTime.now().isAfter(request.requestExpirationTime())) {
// request expired.
existingRequestIterator.remove();
} else {
if (request.lock().tryLock(request.leaseDurationMillis())) {
request.future().complete(null);
existingRequestIterator.remove();
}
}
}
}
}
}
private class LockRequest {
private final Lock lock;
private final DateTime requestExpirationTime;
private final int leaseDurationMillis;
private final CompletableFuture<Void> future;
public LockRequest(
Lock lock,
int leaseDurationMillis,
DateTime requestExpirationTime,
CompletableFuture<Void> future) {
this.lock = lock;
this.requestExpirationTime = requestExpirationTime;
this.leaseDurationMillis = leaseDurationMillis;
this.future = future;
}
public Lock lock() {
return lock;
}
public DateTime requestExpirationTime() {
return requestExpirationTime;
}
public int leaseDurationMillis() {
return leaseDurationMillis;
}
public CompletableFuture<Void> future() {
return future;
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import org.onosproject.store.service.DatabaseException;
/**
* Exception that indicates a problem with the state machine snapshotting.
*/
@SuppressWarnings("serial")
public class SnapshotException extends DatabaseException {
public SnapshotException(Throwable t) {
super(t);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import org.onosproject.store.service.VersionedValue;
import com.google.common.base.MoreObjects;
/**
* A table modification event.
*/
public final class TableModificationEvent {
/**
* Type of table modification event.
*/
public enum Type {
ROW_ADDED,
ROW_DELETED,
ROW_UPDATED
}
private final String tableName;
private final String key;
private final VersionedValue value;
private final Type type;
/**
* Creates a new row deleted table modification event.
* @param tableName table name.
* @param key row key
* @param value value associated with the key when it was deleted.
* @return table modification event.
*/
public static TableModificationEvent rowDeleted(String tableName, String key, VersionedValue value) {
return new TableModificationEvent(tableName, key, value, Type.ROW_DELETED);
}
/**
* Creates a new row added table modification event.
* @param tableName table name.
* @param key row key
* @param value value associated with the key
* @return table modification event.
*/
public static TableModificationEvent rowAdded(String tableName, String key, VersionedValue value) {
return new TableModificationEvent(tableName, key, value, Type.ROW_ADDED);
}
/**
* Creates a new row updated table modification event.
* @param tableName table name.
* @param key row key
* @param newValue value
* @return table modification event.
*/
public static TableModificationEvent rowUpdated(String tableName, String key, VersionedValue newValue) {
return new TableModificationEvent(tableName, key, newValue, Type.ROW_UPDATED);
}
private TableModificationEvent(String tableName, String key, VersionedValue value, Type type) {
this.tableName = tableName;
this.key = key;
this.value = value;
this.type = type;
}
/**
* Returns name of table this event is for.
* @return table name
*/
public String tableName() {
return tableName;
}
/**
* Returns the row key this event is for.
* @return row key
*/
public String key() {
return key;
}
/**
* Returns the value associated with the key. If the event for a deletion, this
* method returns value that was deleted.
* @return row value
*/
public VersionedValue value() {
return value;
}
/**
* Returns the type of table modification event.
* @return event type.
*/
public Type type() {
return type;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("type", type)
.add("tableName", tableName)
.add("key", key)
.add("version", value.version())
.toString();
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Allows for reading and writing tablet definition as a JSON file.
*/
public class TabletDefinitionStore {
private final Logger log = getLogger(getClass());
private final File file;
/**
* Creates a reader/writer of the tablet definition file.
*
* @param filePath location of the definition file
*/
public TabletDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Creates a reader/writer of the tablet definition file.
*
* @param filePath location of the definition file
*/
public TabletDefinitionStore(File filePath) {
file = checkNotNull(filePath);
}
/**
* Returns the Map from tablet name to set of initial member nodes.
*
* @return Map from tablet name to set of initial member nodes
* @throws IOException when I/O exception of some sort has occurred.
*/
public Map<String, Set<DefaultControllerNode>> read() throws IOException {
final Map<String, Set<DefaultControllerNode>> tablets = new HashMap<>();
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(file);
final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
while (fields.hasNext()) {
final Entry<String, JsonNode> next = fields.next();
final Set<DefaultControllerNode> nodes = new HashSet<>();
final Iterator<JsonNode> elements = next.getValue().elements();
while (elements.hasNext()) {
ObjectNode nodeDef = (ObjectNode) elements.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpAddress.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
tablets.put(next.getKey(), nodes);
}
return tablets;
}
/**
* Updates the Map from tablet name to set of member nodes.
*
* @param tabletName name of the tablet to update
* @param nodes set of initial member nodes
* @throws IOException when I/O exception of some sort has occurred.
*/
public void write(String tabletName, Set<DefaultControllerNode> nodes) throws IOException {
checkNotNull(tabletName);
checkArgument(tabletName.isEmpty(), "Tablet name cannot be empty");
// TODO should validate if tabletName is allowed in JSON
// load current
Map<String, Set<DefaultControllerNode>> config;
try {
config = read();
} catch (IOException e) {
log.info("Reading tablet config failed, assuming empty definition.");
config = new HashMap<>();
}
// update with specified
config.put(tabletName, nodes);
// write back to file
final ObjectMapper mapper = new ObjectMapper();
final ObjectNode tabletNodes = mapper.createObjectNode();
for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
ArrayNode nodeDefs = mapper.createArrayNode();
tabletNodes.set(tablet.getKey(), nodeDefs);
for (DefaultControllerNode node : tablet.getValue()) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
tabletNodes);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import java.util.Collection;
import net.kuujo.copycat.cluster.TcpClusterConfig;
import net.kuujo.copycat.cluster.TcpMember;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class TcpClusterConfigSerializer extends Serializer<TcpClusterConfig> {
@Override
public void write(Kryo kryo, Output output, TcpClusterConfig object) {
kryo.writeClassAndObject(output, object.getLocalMember());
kryo.writeClassAndObject(output, object.getRemoteMembers());
}
@Override
public TcpClusterConfig read(Kryo kryo, Input input,
Class<TcpClusterConfig> type) {
TcpMember localMember = (TcpMember) kryo.readClassAndObject(input);
@SuppressWarnings("unchecked")
Collection<TcpMember> remoteMembers = (Collection<TcpMember>) kryo.readClassAndObject(input);
return new TcpClusterConfig(localMember, remoteMembers);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import net.kuujo.copycat.cluster.TcpMember;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public class TcpMemberSerializer extends Serializer<TcpMember> {
@Override
public void write(Kryo kryo, Output output, TcpMember object) {
output.writeString(object.host());
output.writeInt(object.port());
}
@Override
public TcpMember read(Kryo kryo, Input input, Class<TcpMember> type) {
String host = input.readString();
int port = input.readInt();
return new TcpMember(host, port);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Strongly consistent, fault-tolerant and durable state management
* based on Raft consensus protocol.
*/
package org.onosproject.store.service.impl;
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service.impl;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
import net.kuujo.copycat.internal.log.OperationEntry;
import net.kuujo.copycat.log.Entry;
import net.kuujo.copycat.log.Log;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.store.serializers.StoreSerializer;
import com.google.common.testing.EqualsTester;
/**
* Test the MapDBLog implementation.
*/
public class MapDBLogTest {
private static final StoreSerializer SERIALIZER = ClusterMessagingProtocol.DB_SERIALIZER;
private static final Entry TEST_ENTRY1 = new OperationEntry(1, "test1");
private static final Entry TEST_ENTRY2 = new OperationEntry(2, "test12");
private static final Entry TEST_ENTRY3 = new OperationEntry(3, "test123");
private static final Entry TEST_ENTRY4 = new OperationEntry(4, "test1234");
private static final Entry TEST_SNAPSHOT_ENTRY = new OperationEntry(5, "snapshot");
private static final long TEST_ENTRY1_SIZE = SERIALIZER.encode(TEST_ENTRY1).length;
private static final long TEST_ENTRY2_SIZE = SERIALIZER.encode(TEST_ENTRY2).length;
private static final long TEST_ENTRY3_SIZE = SERIALIZER.encode(TEST_ENTRY3).length;
private static final long TEST_ENTRY4_SIZE = SERIALIZER.encode(TEST_ENTRY4).length;
private static final long TEST_SNAPSHOT_ENTRY_SIZE = SERIALIZER.encode(TEST_SNAPSHOT_ENTRY).length;
private String dbFileName;
@Before
public void setUp() throws Exception {
File logFile = File.createTempFile("mapdbTest", null);
dbFileName = logFile.getAbsolutePath();
}
@After
public void tearDown() throws Exception {
Files.deleteIfExists(new File(dbFileName).toPath());
Files.deleteIfExists(new File(dbFileName + ".t").toPath());
Files.deleteIfExists(new File(dbFileName + ".p").toPath());
}
@Test(expected = IllegalStateException.class)
public void testAssertOpen() {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.size();
}
@Test
public void testAppendEntry() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntry(TEST_ENTRY1);
OperationEntry first = log.firstEntry();
OperationEntry last = log.lastEntry();
new EqualsTester()
.addEqualityGroup(first, last, TEST_ENTRY1)
.testEquals();
Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
Assert.assertEquals(1, log.firstIndex());
Assert.assertEquals(1, log.lastIndex());
}
@Test
public void testAppendEntries() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3);
OperationEntry first = log.firstEntry();
OperationEntry last = log.lastEntry();
new EqualsTester()
.addEqualityGroup(first, TEST_ENTRY1)
.addEqualityGroup(last, TEST_ENTRY3)
.testEquals();
Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY2_SIZE, TEST_ENTRY3_SIZE, log.size());
Assert.assertEquals(1, log.firstIndex());
Assert.assertEquals(3, log.lastIndex());
Assert.assertTrue(log.containsEntry(1));
Assert.assertTrue(log.containsEntry(2));
}
@Test
public void testDelete() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2);
log.delete();
Assert.assertEquals(0, log.size());
Assert.assertTrue(log.isEmpty());
Assert.assertEquals(0, log.firstIndex());
Assert.assertNull(log.firstEntry());
Assert.assertEquals(0, log.lastIndex());
Assert.assertNull(log.lastEntry());
}
@Test
public void testGetEntries() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
Assert.assertEquals(
TEST_ENTRY1_SIZE +
TEST_ENTRY2_SIZE +
TEST_ENTRY3_SIZE +
TEST_ENTRY4_SIZE, log.size());
List<Entry> entries = log.getEntries(2, 3);
new EqualsTester()
.addEqualityGroup(log.getEntry(4), TEST_ENTRY4)
.addEqualityGroup(entries.get(0), TEST_ENTRY2)
.addEqualityGroup(entries.get(1), TEST_ENTRY3)
.testEquals();
}
@Test
public void testRemoveAfter() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.removeAfter(1);
Assert.assertEquals(TEST_ENTRY1_SIZE, log.size());
new EqualsTester()
.addEqualityGroup(log.firstEntry(), log.lastEntry(), TEST_ENTRY1)
.testEquals();
}
@Test
public void testAddAfterRemove() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.removeAfter(1);
log.appendEntry(TEST_ENTRY4);
Assert.assertEquals(TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE, log.size());
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(), TEST_ENTRY1_SIZE + TEST_ENTRY4_SIZE)
.testEquals();
}
@Test
public void testClose() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
Assert.assertFalse(log.isOpen());
log.open();
Assert.assertTrue(log.isOpen());
log.close();
Assert.assertFalse(log.isOpen());
}
@Test
public void testReopen() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.close();
log.open();
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_ENTRY1)
.addEqualityGroup(log.getEntry(2), TEST_ENTRY2)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(),
TEST_ENTRY1_SIZE +
TEST_ENTRY2_SIZE +
TEST_ENTRY3_SIZE +
TEST_ENTRY4_SIZE)
.testEquals();
}
@Test
public void testCompact() throws IOException {
Log log = new MapDBLog(dbFileName, SERIALIZER);
log.open();
log.appendEntries(TEST_ENTRY1, TEST_ENTRY2, TEST_ENTRY3, TEST_ENTRY4);
log.compact(3, TEST_SNAPSHOT_ENTRY);
new EqualsTester()
.addEqualityGroup(log.firstEntry(), TEST_SNAPSHOT_ENTRY)
.addEqualityGroup(log.lastEntry(), TEST_ENTRY4)
.addEqualityGroup(log.size(),
TEST_SNAPSHOT_ENTRY_SIZE +
TEST_ENTRY4_SIZE)
.testEquals();
}
}
......@@ -69,7 +69,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netty4.version>4.0.23.Final</netty4.version>
<copycat.version>0.3.0.onos</copycat.version>
<copycat.version>0.5.0.onos-SNAPSHOT</copycat.version>
<openflowj.version>0.3.9.oe</openflowj.version>
</properties>
......
......@@ -31,6 +31,14 @@
<description>ONLab third-party dependencies</description>
<!-- TODO: Needed for copycat snapshot. Remove before official release -->
<repositories>
<repository>
<id>snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.googlecode.concurrent-trees</groupId>
......@@ -47,14 +55,14 @@
<dependency>
<!-- FIXME once fixes get merged to upstream -->
<groupId>org.onosproject</groupId>
<artifactId>copycat</artifactId>
<artifactId>copycat-api</artifactId>
<version>${copycat.version}</version>
</dependency>
<dependency>
<!-- FIXME once fixes get merged to upstream -->
<groupId>org.onosproject</groupId>
<artifactId>copycat-tcp</artifactId>
<artifactId>copycat-netty</artifactId>
<version>${copycat.version}</version>
</dependency>
......