Madan Jampani
Committed by Ray Milkey

Added distributed transaction support through a two phase commit protocol

Change-Id: I85d64234a24823fee8b3c2ea830abbb6867dad38
Showing 25 changed files with 959 additions and 384 deletions
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cli.net;
import java.util.Collection;
import org.apache.karaf.shell.commands.Command;
import org.apache.karaf.shell.commands.Option;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.Transaction;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* CLI to work with database transactions in the system.
*/
@Command(scope = "onos", name = "transactions",
description = "Utility for viewing and redriving database transactions")
public class TransactionsCommand extends AbstractShellCommand {
@Option(name = "-r", aliases = "--redrive",
description = "Redrive stuck transactions while removing those that are done",
required = false, multiValued = false)
private boolean redrive = false;
private static final String FMT = "%-20s %-15s %-10s";
/**
* Displays transactions as text.
*
* @param transactions transactions
*/
private void displayTransactions(Collection<Transaction> transactions) {
print("---------------------------------------------");
print(FMT, "Id", "State", "Updated");
print("---------------------------------------------");
transactions.forEach(txn -> print(FMT, txn.id(), txn.state(), Tools.timeAgo(txn.lastUpdated())));
if (transactions.size() > 0) {
print("---------------------------------------------");
}
}
/**
* Converts collection of transactions into a JSON object.
*
* @param transactions transactions
*/
private JsonNode json(Collection<Transaction> transactions) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode txns = mapper.createArrayNode();
// Create a JSON node for each transaction
transactions.stream().forEach(txn -> {
ObjectNode txnNode = mapper.createObjectNode();
txnNode.put("id", txn.id())
.put("state", txn.state().toString())
.put("lastUpdated", txn.lastUpdated());
txns.add(txnNode);
});
return txns;
}
@Override
protected void execute() {
StorageAdminService storageAdminService = get(StorageAdminService.class);
if (redrive) {
storageAdminService.redriveTransactions();
return;
}
Collection<Transaction> transactions = storageAdminService.getTransactions();
if (outputJson()) {
print("%s", json(transactions));
} else {
displayTransactions(transactions);
}
}
}
......@@ -233,6 +233,9 @@
<action class="org.onosproject.cli.net.MapsListCommand"/>
</command>
<command>
<action class="org.onosproject.cli.net.TransactionsCommand"/>
</command>
<command>
<action class="org.onosproject.cli.net.ClusterDevicesCommand"/>
<completers>
<ref component-id="clusterIdCompleter"/>
......
......@@ -35,6 +35,12 @@ public class ConsistentMapException extends RuntimeException {
}
/**
* ConsistentMap update conflicts with an in flight transaction.
*/
public static class ConcurrentModification extends ConsistentMapException {
}
/**
* ConsistentMap operation interrupted.
*/
public static class Interrupted extends ConsistentMapException {
......
......@@ -16,36 +16,62 @@
package org.onosproject.store.service;
import static com.google.common.base.Preconditions.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
/**
* Database update operation.
*
* @param <K> key type.
* @param <V> value type.
*/
public class UpdateOperation<K, V> {
public final class DatabaseUpdate {
/**
* Type of database update operation.
*/
public static enum Type {
/**
* Insert/Update entry without any checks.
*/
PUT,
/**
* Insert an entry iff there is no existing entry for that key.
*/
PUT_IF_ABSENT,
/**
* Update entry if the current version matches specified version.
*/
PUT_IF_VERSION_MATCH,
/**
* Update entry if the current value matches specified value.
*/
PUT_IF_VALUE_MATCH,
/**
* Remove entry without any checks.
*/
REMOVE,
/**
* Remove entry if the current version matches specified version.
*/
REMOVE_IF_VERSION_MATCH,
/**
* Remove entry if the current value matches specified value.
*/
REMOVE_IF_VALUE_MATCH,
}
private Type type;
private String tableName;
private K key;
private V value;
private V currentValue;
private String key;
private byte[] value;
private byte[] currentValue;
private long currentVersion = -1;
/**
......@@ -68,7 +94,7 @@ public class UpdateOperation<K, V> {
* Returns the item key being updated.
* @return item key
*/
public K key() {
public String key() {
return key;
}
......@@ -76,7 +102,7 @@ public class UpdateOperation<K, V> {
* Returns the new value.
* @return item's target value.
*/
public V value() {
public byte[] value() {
return value;
}
......@@ -84,7 +110,7 @@ public class UpdateOperation<K, V> {
* Returns the expected current value in the database value for the key.
* @return current value in database.
*/
public V currentValue() {
public byte[] currentValue() {
return currentValue;
}
......@@ -110,85 +136,81 @@ public class UpdateOperation<K, V> {
/**
* Creates a new builder instance.
* @param <K> key type.
* @param <V> value type.
*
* @return builder.
*/
public static <K, V> Builder<K, V> newBuilder() {
return new Builder<>();
public static Builder newBuilder() {
return new Builder();
}
/**
* UpdatOperation builder.
* DatabaseUpdate builder.
*
* @param <K> key type.
* @param <V> value type.
*/
public static final class Builder<K, V> {
public static final class Builder {
private UpdateOperation<K, V> operation = new UpdateOperation<>();
private DatabaseUpdate update = new DatabaseUpdate();
public UpdateOperation<K, V> build() {
public DatabaseUpdate build() {
validateInputs();
return operation;
return update;
}
public Builder<K, V> withType(Type type) {
operation.type = checkNotNull(type, "type cannot be null");
public Builder withType(Type type) {
update.type = checkNotNull(type, "type cannot be null");
return this;
}
public Builder<K, V> withTableName(String tableName) {
operation.tableName = checkNotNull(tableName, "tableName cannot be null");
public Builder withTableName(String tableName) {
update.tableName = checkNotNull(tableName, "tableName cannot be null");
return this;
}
public Builder<K, V> withKey(K key) {
operation.key = checkNotNull(key, "key cannot be null");
public Builder withKey(String key) {
update.key = checkNotNull(key, "key cannot be null");
return this;
}
public Builder<K, V> withCurrentValue(V value) {
operation.currentValue = checkNotNull(value, "currentValue cannot be null");
public Builder withCurrentValue(byte[] value) {
update.currentValue = checkNotNull(value, "currentValue cannot be null");
return this;
}
public Builder<K, V> withValue(V value) {
operation.value = checkNotNull(value, "value cannot be null");
public Builder withValue(byte[] value) {
update.value = checkNotNull(value, "value cannot be null");
return this;
}
public Builder<K, V> withCurrentVersion(long version) {
public Builder withCurrentVersion(long version) {
checkArgument(version >= 0, "version cannot be negative");
operation.currentVersion = version;
update.currentVersion = version;
return this;
}
private void validateInputs() {
checkNotNull(operation.type, "type must be specified");
checkNotNull(operation.tableName, "table name must be specified");
checkNotNull(operation.key, "key must be specified");
switch (operation.type) {
checkNotNull(update.type, "type must be specified");
checkNotNull(update.tableName, "table name must be specified");
checkNotNull(update.key, "key must be specified");
switch (update.type) {
case PUT:
case PUT_IF_ABSENT:
checkNotNull(operation.value, "value must be specified.");
checkNotNull(update.value, "value must be specified.");
break;
case PUT_IF_VERSION_MATCH:
checkNotNull(operation.value, "value must be specified.");
checkState(operation.currentVersion >= 0, "current version must be specified");
checkNotNull(update.value, "value must be specified.");
checkState(update.currentVersion >= 0, "current version must be specified");
break;
case PUT_IF_VALUE_MATCH:
checkNotNull(operation.value, "value must be specified.");
checkNotNull(operation.currentValue, "currentValue must be specified.");
checkNotNull(update.value, "value must be specified.");
checkNotNull(update.currentValue, "currentValue must be specified.");
break;
case REMOVE:
break;
case REMOVE_IF_VERSION_MATCH:
checkState(operation.currentVersion >= 0, "current version must be specified");
checkState(update.currentVersion >= 0, "current version must be specified");
break;
case REMOVE_IF_VALUE_MATCH:
checkNotNull(operation.currentValue, "currentValue must be specified.");
checkNotNull(update.currentValue, "currentValue must be specified.");
break;
default:
throw new IllegalStateException("Unknown operation type");
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.store.service;
import java.util.Collection;
import java.util.List;
/**
......@@ -35,4 +36,16 @@ public interface StorageAdminService {
* @return list of map information
*/
List<MapInfo> getMapInfo();
/**
* Returns all the transactions in the system.
*
* @return collection of transactions
*/
Collection<Transaction> getTransactions();
/**
* Redrives stuck transactions while removing those that are done.
*/
void redriveTransactions();
}
......
......@@ -29,13 +29,6 @@ package org.onosproject.store.service;
public interface StorageService {
/**
* Creates a new transaction context.
*
* @return transaction context
*/
TransactionContext createTransactionContext();
/**
* Creates a new EventuallyConsistentMapBuilder.
*
* @param <K> key type
......@@ -45,11 +38,11 @@ public interface StorageService {
<K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
/**
* Creates a new EventuallyConsistentMapBuilder.
* Creates a new ConsistentMapBuilder.
*
* @param <K> key type
* @param <V> value type
* @return builder for an eventually consistent map
* @return builder for a consistent map
*/
<K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
......@@ -60,4 +53,11 @@ public interface StorageService {
* @return builder for an distributed set
*/
<E> SetBuilder<E> setBuilder();
}
\ No newline at end of file
/**
* Creates a new transaction context.
*
* @return transaction context
*/
TransactionContext createTransactionContext();
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.List;
/**
* An immutable transaction object.
*/
public interface Transaction {
public enum State {
/**
* Indicates a new transaction that is about to be prepared. All transactions
* start their life in this state.
*/
PREPARING,
/**
* Indicates a transaction that is successfully prepared i.e. all participants voted to commit
*/
PREPARED,
/**
* Indicates a transaction that is about to be committed.
*/
COMMITTING,
/**
* Indicates a transaction that has successfully committed.
*/
COMMITTED,
/**
* Indicates a transaction that is about to be rolled back.
*/
ROLLINGBACK,
/**
* Indicates a transaction that has been rolled back and all locks are released.
*/
ROLLEDBACK
}
/**
* Returns the transaction Id.
*
* @return transaction id
*/
long id();
/**
* Returns the list of updates that are part of this transaction.
*
* @return list of database updates
*/
List<DatabaseUpdate> updates();
/**
* Returns the current state of this transaction.
*
* @return transaction state
*/
State state();
/**
* Returns true if this transaction has completed execution.
*
* @return true is yes, false otherwise
*/
public default boolean isDone() {
return state() == State.COMMITTED || state() == State.ROLLEDBACK;
}
/**
* Returns a new transaction that is created by transitioning this one to the specified state.
*
* @param newState destination state
* @return a new transaction instance similar to the current one but its state set to specified state
*/
Transaction transition(State newState);
/**
* Returns the system time when the transaction was last updated.
*
* @return last update time
*/
public long lastUpdated();
}
\ No newline at end of file
......@@ -19,21 +19,31 @@ package org.onosproject.store.service;
/**
* Provides a context for transactional operations.
* <p>
* A transaction context provides a boundary within which transactions
* are run. It also is a place where all modifications made within a transaction
* are cached until the point when the transaction commits or aborts. It thus ensures
* isolation of work happening with in the transaction boundary.
* <p>
* A transaction context is a vehicle for grouping operations into a unit with the
* properties of atomicity, isolation, and durability. Transactions also provide the
* ability to maintain an application's invariants or integrity constraints,
* supporting the property of consistency. Together these properties are known as ACID.
* <p>
* A transaction context provides a boundary within which transactions
* are run. It also is a place where all modifications made within a transaction
* are cached until the point when the transaction commits or aborts. It thus ensures
* isolation of work happening with in the transaction boundary. Within a transaction
* context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
* The only uncommitted data that can be read is the data modified by the current transaction.
*/
public interface TransactionContext {
/**
* Returns the unique transactionId.
*
* @return transaction id
*/
long transactionId();
/**
* Returns if this transaction context is open.
* @return true if open, false otherwise.
*
* @return true if open, false otherwise
*/
boolean isOpen();
......@@ -45,22 +55,24 @@ public interface TransactionContext {
/**
* Commits a transaction that was previously started thereby making its changes permanent
* and externally visible.
* @throws TransactionException if transaction fails to commit.
*
* @throws TransactionException if transaction fails to commit
*/
void commit();
/**
* Rolls back the current transaction, discarding all its changes.
* Aborts any changes made in this transaction context and discarding all locally cached updates.
*/
void rollback();
void abort();
/**
* Creates a new transactional map.
* Returns a transactional map data structure with the specified name.
*
* @param <K> key type
* @param <V> value type
* @param mapName name of the transactional map.
* @param serializer serializer to use for encoding/decoding keys and vaulues.
* @return new Transactional Map.
* @param mapName name of the transactional map
* @param serializer serializer to use for encoding/decoding keys and values of the map
* @return Transactional Map
*/
<K, V> TransactionalMap<K, V> createTransactionalMap(String mapName, Serializer serializer);
<K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer);
}
......
......@@ -41,8 +41,14 @@ public class TransactionException extends RuntimeException {
}
/**
* Transaction failure due to optimistic concurrency failure.
* Transaction failure due to optimistic concurrency violation.
*/
public static class OptimisticConcurrencyFailure extends TransactionException {
}
/**
* Transaction failure due to a conflicting transaction in progress.
*/
public static class ConcurrentModification extends TransactionException {
}
}
\ No newline at end of file
......
......@@ -16,15 +16,12 @@
package org.onosproject.store.service;
import java.util.Collection;
import java.util.Set;
import java.util.Map.Entry;
/**
* Transactional Map data structure.
* <p>
* A TransactionalMap is created by invoking {@link TransactionContext#createTransactionalMap createTransactionalMap}
* method. All operations performed on this map with in a transaction boundary are invisible externally
* A TransactionalMap is created by invoking {@link TransactionContext#getTransactionalMap getTransactionalMap}
* method. All operations performed on this map within a transaction boundary are invisible externally
* until the point when the transaction commits. A commit usually succeeds in the absence of conflicts.
*
* @param <K> type of key.
......@@ -33,36 +30,6 @@ import java.util.Map.Entry;
public interface TransactionalMap<K, V> {
/**
* Returns the number of entries in the map.
*
* @return map size.
*/
int size();
/**
* Returns true if the map is empty.
*
* @return true if map has no entries, false otherwise.
*/
boolean isEmpty();
/**
* Returns true if this map contains a mapping for the specified key.
*
* @param key key
* @return true if map contains key, false otherwise.
*/
boolean containsKey(K key);
/**
* Returns true if this map contains the specified value.
*
* @param value value
* @return true if map contains value, false otherwise.
*/
boolean containsValue(V value);
/**
* Returns the value to which the specified key is mapped, or null if this
* map contains no mapping for the key.
*
......@@ -94,45 +61,6 @@ public interface TransactionalMap<K, V> {
V remove(K key);
/**
* Removes all of the mappings from this map (optional operation).
* The map will be empty after this call returns.
*/
void clear();
/**
* Returns a Set view of the keys contained in this map.
* This method differs from the behavior of java.util.Map.keySet() in that
* what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
* Attempts to modify the returned set, whether direct or via its iterator,
* result in an UnsupportedOperationException.
*
* @return a set of the keys contained in this map
*/
Set<K> keySet();
/**
* Returns the collection of values contained in this map.
* This method differs from the behavior of java.util.Map.values() in that
* what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
* Attempts to modify the returned collection, whether direct or via its iterator,
* result in an UnsupportedOperationException.
*
* @return a collection of the values contained in this map
*/
Collection<V> values();
/**
* Returns the set of entries contained in this map.
* This method differs from the behavior of java.util.Map.entrySet() in that
* what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
* Attempts to modify the returned set, whether direct or via its iterator,
* result in an UnsupportedOperationException.
*
* @return set of entries contained in this map.
*/
Set<Entry<K, V>> entrySet();
/**
* If the specified key is not already associated with a value
* associates it with the given value and returns null, else returns the current value.
*
......
......@@ -34,6 +34,7 @@ import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -41,6 +42,7 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.DistributedClusterStore;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
......@@ -53,11 +55,13 @@ import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.SetBuilder;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContext;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -92,6 +96,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -188,6 +195,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
Thread.currentThread().interrupt();
log.warn("Failed to complete database initialization.");
}
transactionManager = new TransactionManager(partitionedDatabase);
log.info("Started");
}
......@@ -218,7 +226,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public TransactionContext createTransactionContext() {
return new DefaultTransactionContext(partitionedDatabase);
return new DefaultTransactionContext(partitionedDatabase, transactionIdGenerator.getNewId());
}
@Override
......@@ -331,6 +339,11 @@ public class DatabaseManager implements StorageService, StorageAdminService {
.collect(Collectors.toList());
}
@Override
public Collection<Transaction> getTransactions() {
return complete(transactionManager.getTransactions());
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(DATABASE_OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
......@@ -343,4 +356,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
throw new ConsistentMapException(e.getCause());
}
}
@Override
public void redriveTransactions() {
getTransactions().stream().forEach(transactionManager::execute);
}
}
......
......@@ -17,12 +17,11 @@
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
/**
......@@ -87,7 +86,7 @@ public interface DatabaseProxy<K, V> {
* @param value The value to set.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
CompletableFuture<Result<Versioned<V>>> put(String tableName, K key, V value);
/**
* Removes a value from the table.
......@@ -96,7 +95,7 @@ public interface DatabaseProxy<K, V> {
* @param key The key to remove.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> remove(String tableName, K key);
CompletableFuture<Result<Versioned<V>>> remove(String tableName, K key);
/**
* Clears the table.
......@@ -104,7 +103,7 @@ public interface DatabaseProxy<K, V> {
* @param tableName table name
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Void> clear(String tableName);
CompletableFuture<Result<Void>> clear(String tableName);
/**
* Gets a set of keys in the table.
......@@ -138,7 +137,7 @@ public interface DatabaseProxy<K, V> {
* @param value The value to set if the given key does not exist.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
CompletableFuture<Result<Versioned<V>>> putIfAbsent(String tableName, K key, V value);
/**
* Removes a key and if the existing value for that key matches the specified value.
......@@ -148,7 +147,7 @@ public interface DatabaseProxy<K, V> {
* @param value The value to remove.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> remove(String tableName, K key, V value);
CompletableFuture<Result<Boolean>> remove(String tableName, K key, V value);
/**
* Removes a key and if the existing version for that key matches the specified version.
......@@ -158,7 +157,7 @@ public interface DatabaseProxy<K, V> {
* @param version The expected version.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> remove(String tableName, K key, long version);
CompletableFuture<Result<Boolean>> remove(String tableName, K key, long version);
/**
* Replaces the entry for the specified key only if currently mapped to the specified value.
......@@ -169,7 +168,7 @@ public interface DatabaseProxy<K, V> {
* @param newValue The value with which to replace the given key and value.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
CompletableFuture<Result<Boolean>> replace(String tableName, K key, V oldValue, V newValue);
/**
* Replaces the entry for the specified key only if currently mapped to the specified version.
......@@ -180,14 +179,42 @@ public interface DatabaseProxy<K, V> {
* @param newValue The value with which to replace the given key and version.
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
CompletableFuture<Result<Boolean>> replace(String tableName, K key, long oldVersion, V newValue);
/**
* Perform a atomic batch update operation i.e. either all operations in batch succeed or
* none do and no state changes are made.
* Prepare and commit the specified transaction.
*
* @param updates list of updates to apply atomically.
* @return A completable future to be completed with the result once complete.
* @param transaction transaction to commit (after preparation)
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Boolean> prepareAndCommit(Transaction transaction);
/**
* Prepare the specified transaction for commit. A successful prepare implies
* all the affected resources are locked thus ensuring no concurrent updates can interfere.
*
* @param transaction transaction to prepare (for commit)
* @return A completable future to be completed with the result once complete. The future is completed
* with true if the transaction is successfully prepared i.e. all pre-conditions are met and
* applicable resources locked.
*/
CompletableFuture<Boolean> prepare(Transaction transaction);
/**
* Commit the specified transaction. A successful commit implies
* all the updates are applied, are now durable and are now visible externally.
*
* @param transaction transaction to commit
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Boolean> commit(Transaction transaction);
/**
* Rollback the specified transaction. A successful rollback implies
* all previously acquired locks for the affected resources are released.
*
* @param transaction transaction to rollback
* @return A completable future to be completed with the result once complete
*/
CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
CompletableFuture<Boolean> rollback(Transaction transaction);
}
......
......@@ -23,6 +23,8 @@ import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.cluster.internal.MemberInfo;
......@@ -63,8 +65,14 @@ public class DatabaseSerializer extends SerializerConfig {
private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
.register(DatabaseUpdate.class)
.register(DatabaseUpdate.Type.class)
.register(Pair.class)
.register(ImmutablePair.class)
.register(Result.class)
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
.build();
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
......
......@@ -17,11 +17,10 @@
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.state.Command;
......@@ -62,13 +61,13 @@ public interface DatabaseState<K, V> {
Versioned<V> get(String tableName, K key);
@Command
Versioned<V> put(String tableName, K key, V value);
Result<Versioned<V>> put(String tableName, K key, V value);
@Command
Versioned<V> remove(String tableName, K key);
Result<Versioned<V>> remove(String tableName, K key);
@Command
void clear(String tableName);
Result<Void> clear(String tableName);
@Query
Set<K> keySet(String tableName);
......@@ -80,20 +79,29 @@ public interface DatabaseState<K, V> {
Set<Entry<K, Versioned<V>>> entrySet(String tableName);
@Command
Versioned<V> putIfAbsent(String tableName, K key, V value);
Result<Versioned<V>> putIfAbsent(String tableName, K key, V value);
@Command
boolean remove(String tableName, K key, V value);
Result<Boolean> remove(String tableName, K key, V value);
@Command
boolean remove(String tableName, K key, long version);
Result<Boolean> remove(String tableName, K key, long version);
@Command
boolean replace(String tableName, K key, V oldValue, V newValue);
Result<Boolean> replace(String tableName, K key, V oldValue, V newValue);
@Command
boolean replace(String tableName, K key, long oldVersion, V newValue);
Result<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
@Command
boolean batchUpdate(List<UpdateOperation<K, V>> updates);
boolean prepareAndCommit(Transaction transaction);
@Command
boolean prepare(Transaction transaction);
@Command
boolean commit(Transaction transaction);
@Command
boolean rollback(Transaction transaction);
}
......
......@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
......@@ -108,6 +109,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
.thenApply(this::unwrapResult)
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
......@@ -116,13 +118,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
return database.remove(name, keyCache.getUnchecked(key))
.thenApply(this::unwrapResult)
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public CompletableFuture<Void> clear() {
return database.clear(name);
return database.clear(name).thenApply(this::unwrapResult);
}
@Override
......@@ -154,23 +157,27 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return database.putIfAbsent(
name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
v != null ?
new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
return database.putIfAbsent(name,
keyCache.getUnchecked(key),
serializer.encode(value))
.thenApply(this::unwrapResult)
.thenApply(v -> v != null ?
new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value))
.thenApply(this::unwrapResult);
}
@Override
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
return database.remove(name, keyCache.getUnchecked(key), version);
return database.remove(name, keyCache.getUnchecked(key), version)
.thenApply(this::unwrapResult);
}
......@@ -179,14 +186,16 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue))
.thenApply(this::unwrapResult);
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))
.thenApply(this::unwrapResult);
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
......@@ -197,4 +206,14 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
e.getValue().version(),
e.getValue().creationTime()));
}
private <T> T unwrapResult(Result<T> result) {
if (result.status() == Result.Status.LOCKED) {
throw new ConsistentMapException.ConcurrentModification();
} else if (result.success()) {
return result.value();
} else {
throw new IllegalStateException("Must not be here");
}
}
}
\ No newline at end of file
......
......@@ -23,13 +23,12 @@ import net.kuujo.copycat.state.internal.DefaultStateMachine;
import net.kuujo.copycat.util.concurrent.Futures;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
/**
......@@ -39,7 +38,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
private DatabaseProxy<String, byte[]> proxy;
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
public DefaultDatabase(ResourceContext context) {
super(context);
this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
......@@ -91,17 +90,17 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.put(tableName, key, value));
}
@Override
public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
return checkOpen(() -> proxy.remove(tableName, key));
}
@Override
public CompletableFuture<Void> clear(String tableName) {
public CompletableFuture<Result<Void>> clear(String tableName) {
return checkOpen(() -> proxy.clear(tableName));
}
......@@ -121,33 +120,48 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
return checkOpen(() -> proxy.remove(tableName, key, value));
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
return checkOpen(() -> proxy.remove(tableName, key, version));
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
public CompletableFuture<Result<Boolean>> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
public CompletableFuture<Result<Boolean>> replace(String tableName, String key, long oldVersion, byte[] newValue) {
return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
}
@Override
public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
return checkOpen(() -> proxy.atomicBatchUpdate(updates));
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
return checkOpen(() -> proxy.prepareAndCommit(transaction));
}
@Override
public CompletableFuture<Boolean> prepare(Transaction transaction) {
return checkOpen(() -> proxy.prepare(transaction));
}
@Override
public CompletableFuture<Boolean> commit(Transaction transaction) {
return checkOpen(() -> proxy.commit(transaction));
}
@Override
public CompletableFuture<Boolean> rollback(Transaction transaction) {
return checkOpen(() -> proxy.rollback(transaction));
}
@Override
......@@ -180,4 +194,4 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
return false;
}
}
\ No newline at end of file
}
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
......@@ -46,6 +47,7 @@ public class DefaultDistributedSet<E> implements Set<E> {
return backingMap.isEmpty();
}
@SuppressWarnings("unchecked")
@Override
public boolean contains(Object o) {
return backingMap.containsKey((E) o);
......@@ -71,6 +73,7 @@ public class DefaultDistributedSet<E> implements Set<E> {
return backingMap.putIfAbsent(e, true) == null;
}
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
return backingMap.remove((E) o, true);
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import java.util.List;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import com.google.common.collect.ImmutableList;
/**
* A Default transaction implementation.
*/
public class DefaultTransaction implements Transaction {
private final long transactionId;
private final List<DatabaseUpdate> updates;
private final State state;
private final long lastUpdated;
public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
}
private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
this.transactionId = transactionId;
this.updates = ImmutableList.copyOf(updates);
this.state = state;
this.lastUpdated = lastUpdated;
}
@Override
public long id() {
return transactionId;
}
@Override
public List<DatabaseUpdate> updates() {
return updates;
}
@Override
public State state() {
return state;
}
@Override
public Transaction transition(State newState) {
return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
}
@Override
public long lastUpdated() {
return lastUpdated;
}
}
\ No newline at end of file
......@@ -18,19 +18,14 @@ package org.onosproject.store.consistent.impl;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static com.google.common.base.Preconditions.*;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionException;
import org.onosproject.store.service.TransactionalMap;
import org.onosproject.store.service.UpdateOperation;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
......@@ -40,80 +35,69 @@ import com.google.common.collect.Maps;
*/
public class DefaultTransactionContext implements TransactionContext {
private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
private static final String TX_NOT_OPEN_ERROR = "Transaction Context is not open";
@SuppressWarnings("rawtypes")
private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
private boolean isOpen = false;
private final Database database;
private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
private final long transactionId;
DefaultTransactionContext(Database database) {
this.database = checkNotNull(database, "Database must not be null");
public DefaultTransactionContext(Database database, long transactionId) {
this.database = checkNotNull(database);
this.transactionId = transactionId;
}
@Override
public long transactionId() {
return transactionId;
}
@Override
public void begin() {
checkState(!isOpen, "Transaction Context is already open");
isOpen = true;
}
@Override
public boolean isOpen() {
return isOpen;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName,
public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
Serializer serializer) {
checkNotNull(mapName, "map name is null");
checkNotNull(serializer, "serializer is null");
checkState(isOpen, TX_NOT_OPEN_ERROR);
if (!txMaps.containsKey(mapName)) {
ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
txMaps.put(mapName, txMap);
}
return txMaps.get(mapName);
checkNotNull(mapName);
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
new DefaultConsistentMap<>(name, database, serializer),
this,
serializer));
}
@SuppressWarnings("unchecked")
@Override
public void commit() {
checkState(isOpen, TX_NOT_OPEN_ERROR);
List<UpdateOperation<String, byte[]>> allUpdates =
Lists.newLinkedList();
try {
List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values()
.stream()
.forEach(m -> {
allUpdates.addAll(m.prepareDatabaseUpdates());
});
if (!complete(database.atomicBatchUpdate(allUpdates))) {
throw new TransactionException.OptimisticConcurrencyFailure();
}
.forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
} catch (Exception e) {
abort();
throw new TransactionException(e);
} finally {
isOpen = false;
}
}
@Override
public void rollback() {
public void abort() {
checkState(isOpen, TX_NOT_OPEN_ERROR);
txMaps.values()
.stream()
.forEach(m -> m.rollback());
}
@Override
public boolean isOpen() {
return false;
}
private static <T> T complete(CompletableFuture<T> future) {
try {
return future.get(TRANSACTION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TransactionException.Interrupted();
} catch (TimeoutException e) {
throw new TransactionException.Timeout();
} catch (ExecutionException e) {
throw new TransactionException(e.getCause());
}
txMaps.values().forEach(m -> m.rollback());
}
}
}
\ No newline at end of file
......
......@@ -16,23 +16,24 @@
package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Set;
import org.onlab.util.HexString;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
import static com.google.common.base.Preconditions.*;
import com.google.common.base.Objects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -55,6 +56,23 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
private final Map<K, V> writeCache = Maps.newConcurrentMap();
private final Set<K> deleteSet = Sets.newConcurrentHashSet();
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
private static final String ERROR_NULL_KEY = "Null key is not allowed";
private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
.softValues()
.build(new CacheLoader<K, String>() {
@Override
public String load(K key) {
return HexString.toHexString(serializer.encode(key));
}
});
protected K dK(String key) {
return serializer.decode(HexString.fromHexString(key));
}
public DefaultTransactionalMap(
String name,
ConsistentMap<K, V> backingMap,
......@@ -69,15 +87,15 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
@Override
public V get(K key) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
checkNotNull(key, ERROR_NULL_KEY);
if (deleteSet.contains(key)) {
return null;
} else if (writeCache.containsKey(key)) {
return writeCache.get(key);
}
V latest = writeCache.get(key);
if (latest != null) {
return latest;
} else {
if (!readCache.containsKey(key)) {
readCache.put(key, backingMap.get(key));
}
Versioned<V> v = readCache.get(key);
Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
return v != null ? v.value() : null;
}
}
......@@ -85,25 +103,31 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
@Override
public V put(K key, V value) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Versioned<V> original = readCache.get(key);
V recentUpdate = writeCache.put(key, value);
checkNotNull(value, ERROR_NULL_VALUE);
V latest = get(key);
writeCache.put(key, value);
deleteSet.remove(key);
return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
return latest;
}
@Override
public V remove(K key) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
Versioned<V> original = readCache.get(key);
V recentUpdate = writeCache.remove(key);
deleteSet.add(key);
return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
V latest = get(key);
if (latest != null) {
writeCache.remove(key);
deleteSet.add(key);
}
return latest;
}
@Override
public boolean remove(K key, V value) {
V currentValue = get(key);
if (value.equals(currentValue)) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
checkNotNull(value, ERROR_NULL_VALUE);
V latest = get(key);
if (Objects.equal(value, latest)) {
remove(key);
return true;
}
......@@ -112,8 +136,11 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
@Override
public boolean replace(K key, V oldValue, V newValue) {
V currentValue = get(key);
if (oldValue.equals(currentValue)) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
checkNotNull(oldValue, ERROR_NULL_VALUE);
checkNotNull(newValue, ERROR_NULL_VALUE);
V latest = get(key);
if (Objects.equal(oldValue, latest)) {
put(key, newValue);
return true;
}
......@@ -121,70 +148,25 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
}
@Override
public int size() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public boolean isEmpty() {
return size() == 0;
}
@Override
public boolean containsKey(K key) {
return get(key) != null;
}
@Override
public boolean containsValue(V value) {
// TODO
throw new UnsupportedOperationException();
}
@Override
public void clear() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public Set<K> keySet() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public Collection<V> values() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public Set<Entry<K, V>> entrySet() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public V putIfAbsent(K key, V value) {
V currentValue = get(key);
if (currentValue == null) {
checkState(txContext.isOpen(), TX_CLOSED_ERROR);
checkNotNull(value, ERROR_NULL_VALUE);
V latest = get(key);
if (latest == null) {
put(key, value);
return null;
}
return currentValue;
return latest;
}
protected List<UpdateOperation<String, byte[]>> prepareDatabaseUpdates() {
List<UpdateOperation<K, V>> updates = Lists.newLinkedList();
protected List<DatabaseUpdate> prepareDatabaseUpdates() {
List<DatabaseUpdate> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
Versioned<V> original = readCache.get(key);
if (original != null) {
updates.add(UpdateOperation.<K, V>newBuilder()
updates.add(DatabaseUpdate.newBuilder()
.withTableName(name)
.withType(UpdateOperation.Type.REMOVE_IF_VERSION_MATCH)
.withKey(key)
.withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.build());
}
......@@ -192,44 +174,23 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
writeCache.forEach((key, value) -> {
Versioned<V> original = readCache.get(key);
if (original == null) {
updates.add(UpdateOperation.<K, V>newBuilder()
updates.add(DatabaseUpdate.newBuilder()
.withTableName(name)
.withType(UpdateOperation.Type.PUT_IF_ABSENT)
.withKey(key)
.withValue(value)
.withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
.withKey(keyCache.getUnchecked(key))
.withValue(serializer.encode(value))
.build());
} else {
updates.add(UpdateOperation.<K, V>newBuilder()
updates.add(DatabaseUpdate.newBuilder()
.withTableName(name)
.withType(UpdateOperation.Type.PUT_IF_VERSION_MATCH)
.withKey(key)
.withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
.withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.withValue(value)
.withValue(serializer.encode(value))
.build());
}
});
return updates.stream().map(this::toRawUpdateOperation).collect(Collectors.toList());
}
private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
rawUpdate = rawUpdate.withKey(HexString.toHexString(serializer.encode(update.key())))
.withCurrentVersion(update.currentVersion())
.withType(update.type());
rawUpdate = rawUpdate.withTableName(update.tableName());
if (update.value() != null) {
rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
}
if (update.currentValue() != null) {
rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
}
return rawUpdate.build();
return updates;
}
/**
......
......@@ -27,7 +27,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
......@@ -129,24 +130,27 @@ public class PartitionedDatabase implements Database {
}
@Override
public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
public CompletableFuture<Result<Versioned<byte[]>>> put(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).put(tableName, key, value);
}
@Override
public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
public CompletableFuture<Result<Versioned<byte[]>>> remove(String tableName, String key) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key);
}
@Override
public CompletableFuture<Void> clear(String tableName) {
public CompletableFuture<Result<Void>> clear(String tableName) {
AtomicBoolean isLocked = new AtomicBoolean(false);
checkState(isOpen.get(), DB_NOT_OPEN);
return CompletableFuture.allOf(partitions
.stream()
.map(p -> p.clear(tableName))
.toArray(CompletableFuture[]::new));
.map(p -> p.clear(tableName)
.thenApply(v -> isLocked.compareAndSet(false, Result.Status.LOCKED == v.status())))
.toArray(CompletableFuture[]::new))
.thenApply(v -> isLocked.get() ? Result.locked() : Result.ok(null));
}
@Override
......@@ -183,59 +187,86 @@ public class PartitionedDatabase implements Database {
}
@Override
public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
public CompletableFuture<Result<Versioned<byte[]>>> putIfAbsent(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
public CompletableFuture<Result<Boolean>> remove(String tableName, String key, byte[] value) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, value);
}
@Override
public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
public CompletableFuture<Result<Boolean>> remove(String tableName, String key, long version) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).remove(tableName, key, version);
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
public CompletableFuture<Result<Boolean>> replace(
String tableName, String key, byte[] oldValue, byte[] newValue) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
}
@Override
public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
public CompletableFuture<Result<Boolean>> replace(
String tableName, String key, long oldVersion, byte[] newValue) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
}
@Override
public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
checkState(isOpen.get(), DB_NOT_OPEN);
Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
for (UpdateOperation<String, byte[]> update : updates) {
Database partition = partitioner.getPartition(update.tableName(), update.key());
List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
if (partitionUpdates == null) {
partitionUpdates = Lists.newArrayList();
perPartitionUpdates.put(partition, partitionUpdates);
}
partitionUpdates.add(update);
}
if (perPartitionUpdates.size() > 1) {
// TODO
throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
if (subTransactions.isEmpty()) {
return CompletableFuture.completedFuture(true);
} else if (subTransactions.size() == 1) {
Entry<Database, Transaction> entry =
subTransactions.entrySet().iterator().next();
return entry.getKey().prepareAndCommit(entry.getValue());
} else {
Entry<Database, List<UpdateOperation<String, byte[]>>> only =
perPartitionUpdates.entrySet().iterator().next();
return only.getKey().atomicBatchUpdate(only.getValue());
return new TransactionManager(this).execute(transaction);
}
}
@Override
public CompletableFuture<Boolean> prepare(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
AtomicBoolean status = new AtomicBoolean(true);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry
.getKey()
.prepare(entry.getValue())
.thenApply(v -> status.compareAndSet(true, v)))
.toArray(CompletableFuture[]::new))
.thenApply(v -> status.get());
}
@Override
public CompletableFuture<Boolean> commit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().commit(entry.getValue()))
.toArray(CompletableFuture[]::new))
.thenApply(v -> true);
}
@Override
public CompletableFuture<Boolean> rollback(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
return CompletableFuture.allOf(subTransactions.entrySet()
.stream()
.map(entry -> entry.getKey().rollback(entry.getValue()))
.toArray(CompletableFuture[]::new))
.thenApply(v -> true);
}
@Override
public CompletableFuture<Database> open() {
return CompletableFuture.allOf(partitions
.stream()
......@@ -243,7 +274,8 @@ public class PartitionedDatabase implements Database {
.toArray(CompletableFuture[]::new))
.thenApply(v -> {
isOpen.set(true);
return this; });
return this;
});
}
@Override
......@@ -279,4 +311,19 @@ public class PartitionedDatabase implements Database {
public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
throw new UnsupportedOperationException();
}
}
\ No newline at end of file
private Map<Database, Transaction> createSubTransactions(
Transaction transaction) {
Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
for (DatabaseUpdate update : transaction.updates()) {
Database partition = partitioner.getPartition(update.tableName(), update.key());
List<DatabaseUpdate> partitionUpdates =
perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
partitionUpdates.add(update);
}
Map<Database, Transaction> subTransactions = Maps.newHashMap();
perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
return subTransactions;
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
/**
* Result of a database update operation.
*
* @param <V> return value type
*/
public final class Result<V> {
public enum Status {
/**
* Indicates a successful update.
*/
OK,
/**
* Indicates a failure due to underlying state being locked by another transaction.
*/
LOCKED
}
private final Status status;
private final V value;
/**
* Creates a new Result instance with the specified value with status set to Status.OK.
*
* @param <V> result value type
* @param value result value
* @return Result instance
*/
public static <V> Result<V> ok(V value) {
return new Result<>(value, Status.OK);
}
/**
* Creates a new Result instance with status set to Status.LOCKED.
*
* @param <V> result value type
* @return Result instance
*/
public static <V> Result<V> locked() {
return new Result<>(null, Status.LOCKED);
}
private Result(V value, Status status) {
this.value = value;
this.status = status;
}
/**
* Returns true if this result indicates a successful execution i.e status is Status.OK.
*
* @return true if successful, false otherwise
*/
public boolean success() {
return status == Status.OK;
}
/**
* Returns the status of database update operation.
* @return database update status
*/
public Status status() {
return status;
}
/**
* Returns the return value for the update.
* @return value returned by database update. If the status is another
* other than Status.OK, this returns a null
*/
public V value() {
return value;
}
}
\ No newline at end of file
......@@ -36,4 +36,4 @@ public class SimpleTableHashPartitioner extends DatabasePartitioner {
public Database getPartition(String tableName, String key) {
return partitions.get(hash(tableName) % partitions.size());
}
}
\ No newline at end of file
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.Transaction.State;
/**
* Agent that runs the two phase commit protocol.
*/
public class TransactionManager {
private final Database database;
private final AsyncConsistentMap<Long, Transaction> transactions;
private final Serializer serializer = new Serializer() {
private KryoNamespace kryo = KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
.register(DatabaseUpdate.class)
.register(DatabaseUpdate.Type.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
.register(Pair.class)
.register(ImmutablePair.class)
.build();
@Override
public <T> byte[] encode(T object) {
return kryo.serialize(object);
}
@Override
public <T> T decode(byte[] bytes) {
return kryo.deserialize(bytes);
}
};
/**
* Constructs a new TransactionManager for the specified database instance.
*
* @param database database
*/
public TransactionManager(Database database) {
this.database = checkNotNull(database, "database cannot be null");
this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer);
}
/**
* Executes the specified transaction by employing a two phase commit protocol.
*
* @param transaction transaction to commit
* @return transaction result. Result value true indicates a successful commit, false
* indicates abort
*/
public CompletableFuture<Boolean> execute(Transaction transaction) {
// clean up if this transaction in already in a terminal state.
if (transaction.state() == Transaction.State.COMMITTED ||
transaction.state() == Transaction.State.ROLLEDBACK) {
return transactions.remove(transaction.id()).thenApply(v -> true);
} else if (transaction.state() == Transaction.State.COMMITTING) {
return commit(transaction);
} else if (transaction.state() == Transaction.State.ROLLINGBACK) {
return rollback(transaction);
} else {
return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
}
}
/**
* Returns all transactions in the system.
*
* @return future for a collection of transactions
*/
public CompletableFuture<Collection<Transaction>> getTransactions() {
return transactions.values().thenApply(c -> {
Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
return txns;
});
}
private CompletableFuture<Boolean> prepare(Transaction transaction) {
return transactions.put(transaction.id(), transaction)
.thenCompose(v -> database.prepare(transaction))
.thenCompose(status -> transactions.put(
transaction.id(),
transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
.thenApply(v -> status));
}
private CompletableFuture<Boolean> commit(Transaction transaction) {
return database.commit(transaction)
.thenCompose(v -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.COMMITTED)))
.thenApply(v -> true);
}
private CompletableFuture<Boolean> rollback(Transaction transaction) {
return database.rollback(transaction)
.thenCompose(v -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.ROLLEDBACK)))
.thenApply(v -> true);
}
}
\ No newline at end of file