Madan Jampani
Committed by Gerrit Code Review

Refactor transaction support in preparation for migration to latest APIs

 - Added a explicit transaction id type
 - cli command now just returns the identifiers of in-progress transactions
 - Removed redriveTransactions until a better alternative is provided
 - Removed DatabaseUpdate and replaced its usage with MapUpdate

Change-Id: Ic4a14967072068834510cd8459fd2a6790e456ef
Showing 34 changed files with 395 additions and 708 deletions
......@@ -18,81 +18,41 @@ package org.onosproject.cli.net;
import java.util.Collection;
import org.apache.karaf.shell.commands.Command;
import org.apache.karaf.shell.commands.Option;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.Transaction;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* CLI to work with database transactions in the system.
* CLI to view in-progress database transactions in the system.
*/
@Command(scope = "onos", name = "transactions",
description = "Utility for viewing and redriving database transactions")
description = "Utility for listing pending/inprogress transactions")
public class TransactionsCommand extends AbstractShellCommand {
@Option(name = "-r", aliases = "--redrive",
description = "Redrive stuck transactions while removing those that are done",
required = false, multiValued = false)
private boolean redrive = false;
private static final String FMT = "%-20s %-15s %-10s";
/**
* Displays transactions as text.
*
* @param transactions transactions
*/
private void displayTransactions(Collection<Transaction> transactions) {
print("---------------------------------------------");
print(FMT, "Id", "State", "Updated");
print("---------------------------------------------");
transactions.forEach(txn -> print(FMT, txn.id(), txn.state(), Tools.timeAgo(txn.lastUpdated())));
if (transactions.size() > 0) {
print("---------------------------------------------");
}
}
/**
* Converts collection of transactions into a JSON object.
*
* @param transactions transactions
* @param transactionIds transaction identifiers
*/
private JsonNode json(Collection<Transaction> transactions) {
private JsonNode json(Collection<TransactionId> transactionIds) {
ObjectMapper mapper = new ObjectMapper();
ArrayNode txns = mapper.createArrayNode();
// Create a JSON node for each transaction
transactions.stream().forEach(txn -> {
ObjectNode txnNode = mapper.createObjectNode();
txnNode.put("id", txn.id())
.put("state", txn.state().toString())
.put("lastUpdated", txn.lastUpdated());
txns.add(txnNode);
});
transactionIds.forEach(id -> txns.add(id.toString()));
return txns;
}
@Override
protected void execute() {
StorageAdminService storageAdminService = get(StorageAdminService.class);
if (redrive) {
storageAdminService.redriveTransactions();
return;
}
Collection<Transaction> transactions = storageAdminService.getTransactions();
Collection<TransactionId> transactionIds = storageAdminService.getPendingTransactions();
if (outputJson()) {
print("%s", json(transactions));
print("%s", json(transactionIds));
} else {
displayTransactions(transactions);
transactionIds.forEach(id -> print("%s", id.toString()));
}
}
}
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
package org.onosproject.store.primitives;
import com.google.common.base.Objects;
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
/**
* Database update operation.
*
*/
public final class DatabaseUpdate {
/**
* Type of database update operation.
*/
public enum Type {
/**
* Insert/Update entry without any checks.
*/
PUT,
/**
* Insert an entry iff there is no existing entry for that key.
*/
PUT_IF_ABSENT,
/**
* Update entry if the current version matches specified version.
*/
PUT_IF_VERSION_MATCH,
/**
* Update entry if the current value matches specified value.
*/
PUT_IF_VALUE_MATCH,
/**
* Remove entry without any checks.
*/
REMOVE,
/**
* Remove entry if the current version matches specified version.
*/
REMOVE_IF_VERSION_MATCH,
/**
* Remove entry if the current value matches specified value.
*/
REMOVE_IF_VALUE_MATCH,
}
private Type type;
private String mapName;
private String key;
private byte[] value;
private byte[] currentValue;
private long currentVersion = -1;
/**
* Returns the type of update operation.
* @return type of update.
*/
public Type type() {
return type;
}
/**
* Returns the name of map being updated.
* @return map name.
*/
public String mapName() {
return mapName;
}
/**
* Returns the item key being updated.
* @return item key
*/
public String key() {
return key;
}
/**
* Returns the new value.
* @return item's target value.
*/
public byte[] value() {
return value;
}
/**
* Returns the expected current value in the database value for the key.
* @return current value in database.
*/
public byte[] currentValue() {
return currentValue;
}
/**
* Returns the expected current version in the database for the key.
* @return expected version.
*/
public long currentVersion() {
return currentVersion;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("type", type)
.add("mapName", mapName)
.add("key", key)
.add("value", value)
.add("currentValue", currentValue)
.add("currentVersion", currentVersion)
.toString();
}
/**
* Creates a new builder instance.
*
* @return builder.
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* DatabaseUpdate builder.
*
*/
public static final class Builder {
private DatabaseUpdate update = new DatabaseUpdate();
public DatabaseUpdate build() {
validateInputs();
return update;
}
public Builder withType(Type type) {
update.type = checkNotNull(type, "type cannot be null");
return this;
}
public Builder withMapName(String mapName) {
update.mapName = checkNotNull(mapName, "mapName cannot be null");
return this;
}
public Builder withKey(String key) {
update.key = checkNotNull(key, "key cannot be null");
return this;
}
public Builder withCurrentValue(byte[] value) {
update.currentValue = checkNotNull(value, "currentValue cannot be null");
return this;
}
public Builder withValue(byte[] value) {
update.value = checkNotNull(value, "value cannot be null");
return this;
}
public Builder withCurrentVersion(long version) {
checkArgument(version >= 0, "version cannot be negative");
update.currentVersion = version;
return this;
}
private void validateInputs() {
checkNotNull(update.type, "type must be specified");
checkNotNull(update.mapName, "map name must be specified");
checkNotNull(update.key, "key must be specified");
switch (update.type) {
case PUT:
case PUT_IF_ABSENT:
checkNotNull(update.value, "value must be specified.");
break;
case PUT_IF_VERSION_MATCH:
checkNotNull(update.value, "value must be specified.");
checkState(update.currentVersion >= 0, "current version must be specified");
break;
case PUT_IF_VALUE_MATCH:
checkNotNull(update.value, "value must be specified.");
checkNotNull(update.currentValue, "currentValue must be specified.");
break;
case REMOVE:
break;
case REMOVE_IF_VERSION_MATCH:
checkState(update.currentVersion >= 0, "current version must be specified");
break;
case REMOVE_IF_VALUE_MATCH:
checkNotNull(update.currentValue, "currentValue must be specified.");
break;
default:
throw new IllegalStateException("Unknown operation type");
}
}
}
}
......@@ -61,7 +61,12 @@ public interface DistributedPrimitive {
/**
* Leader elector.
*/
LEADER_ELECTOR
LEADER_ELECTOR,
/**
* Transaction Context.
*/
TRANSACTION_CONTEXT
}
static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 5000L;
......
......@@ -19,6 +19,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.onosproject.store.primitives.TransactionId;
/**
* Service for administering storage instances.
*/
......@@ -62,14 +64,9 @@ public interface StorageAdminService {
Map<String, Long> getInMemoryDatabaseCounters();
/**
* Returns all the transactions in the system.
* Returns all pending transactions.
*
* @return collection of transactions
*/
Collection<Transaction> getTransactions();
/**
* Redrives stuck transactions while removing those that are done.
* @return collection of pending transaction identifiers.
*/
void redriveTransactions();
Collection<TransactionId> getPendingTransactions();
}
......
......@@ -16,6 +16,8 @@
package org.onosproject.store.service;
import org.onosproject.store.primitives.TransactionId;
/**
* Provides a context for transactional operations.
* <p>
......@@ -31,14 +33,19 @@ package org.onosproject.store.service;
* context isolation level is REPEATABLE_READS i.e. only data that is committed can be read.
* The only uncommitted data that can be read is the data modified by the current transaction.
*/
public interface TransactionContext {
public interface TransactionContext extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type type() {
return DistributedPrimitive.Type.TRANSACTION_CONTEXT;
}
/**
* Returns the unique transactionId.
* Returns the transaction identifier.
*
* @return transaction id
*/
long transactionId();
TransactionId transactionId();
/**
* Returns if this transaction context is open.
......
......@@ -15,33 +15,14 @@
*/
package org.onosproject.store.service;
import org.onosproject.store.primitives.DistributedPrimitiveBuilder;
/**
* Interface definition for a transaction context builder.
* Abstract base class for a transaction context builder.
*/
public interface TransactionContextBuilder {
/**
* Disables distribution of map entries across multiple database partitions.
* <p>
* When partitioning is disabled, the returned map will have a single
* partition that spans the entire cluster. Furthermore, the changes made to
* the map are ephemeral and do not survive a full cluster restart.
* </p>
* <p>
* Note: By default, partitions are enabled. This feature is intended to
* simplify debugging.
* </p>
*
* @return this TransactionalContextBuilder
*/
TransactionContextBuilder withPartitionsDisabled();
public abstract class TransactionContextBuilder extends DistributedPrimitiveBuilder<TransactionContext> {
/**
* Builds a TransactionContext based on configuration options supplied to this
* builder.
*
* @return a new TransactionalContext
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
TransactionContext build();
public TransactionContextBuilder() {
super(DistributedPrimitive.Type.TRANSACTION_CONTEXT);
}
}
......
......@@ -32,6 +32,7 @@ import java.util.Scanner;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
......@@ -40,8 +41,6 @@ import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
import org.onosproject.store.primitives.resources.impl.TransactionId;
import org.onosproject.store.primitives.resources.impl.TransactionalMapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
......@@ -66,7 +65,8 @@ public final class CatalystSerializers {
MapEntryUpdateResult.Status.class,
MapUpdate.class,
MapUpdate.Type.class,
TransactionalMapUpdate.class,
Transaction.class,
Transaction.State.class,
TransactionId.class,
PrepareResult.class,
CommitResult.class,
......@@ -99,7 +99,8 @@ public final class CatalystSerializers {
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
serializer.register(TransactionalMapUpdate.class, factory);
serializer.register(Transaction.class, factory);
serializer.register(Transaction.State.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
serializer.register(RollbackResult.class, factory);
......
......@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
......@@ -46,7 +47,6 @@ import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -63,10 +63,12 @@ import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
......@@ -79,7 +81,6 @@ import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContextBuilder;
import org.slf4j.Logger;
......@@ -112,7 +113,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
protected NodeId localNodeId;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private ApplicationListener appListener = new InternalApplicationListener();
......@@ -212,7 +214,17 @@ public class DatabaseManager implements StorageService, StorageAdminService {
Futures.getUnchecked(status);
transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
AsyncConsistentMap<TransactionId, Transaction> transactions =
this.<TransactionId, Transaction>consistentMapBuilder()
.withName("onos-transactions")
.withSerializer(Serializer.using(KryoNamespaces.API,
MapUpdate.class,
MapUpdate.Type.class,
Transaction.class,
Transaction.State.class))
.buildAsyncMap();
transactionManager = new TransactionManager(partitionedDatabase, transactions);
partitionedDatabase.setTransactionManager(transactionManager);
log.info("Started");
......@@ -238,7 +250,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public TransactionContextBuilder transactionContextBuilder() {
return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
return new DefaultTransactionContextBuilder(this::consistentMapBuilder,
transactionManager::execute,
transactionIdGenerator.get());
}
@Override
......@@ -385,8 +399,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
}
@Override
public Collection<Transaction> getTransactions() {
return complete(transactionManager.getTransactions());
public Collection<TransactionId> getPendingTransactions() {
return complete(transactionManager.getPendingTransactionIds());
}
private static <T> T complete(CompletableFuture<T> future) {
......@@ -402,11 +416,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
}
}
@Override
public void redriveTransactions() {
getTransactions().stream().forEach(transactionManager::execute);
}
protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) {
maps.put(map.name(), map);
if (map.applicationId() != null) {
......
......@@ -17,7 +17,6 @@
package org.onosproject.store.primitives.impl;
import org.onlab.util.Match;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
......
......@@ -21,10 +21,10 @@ import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.cluster.internal.MemberInfo;
......@@ -69,13 +69,14 @@ public class DatabaseSerializer extends SerializerConfig {
private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
.register(DatabaseUpdate.class)
.register(DatabaseUpdate.Type.class)
.register(MapUpdate.class)
.register(MapUpdate.Type.class)
.register(Result.class)
.register(UpdateResult.class)
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.class)
.register(Transaction.State.class)
.register(TransactionId.class)
.register(org.onosproject.store.primitives.impl.CommitResponse.class)
.register(Match.class)
.register(NodeId.class)
......
......@@ -22,7 +22,6 @@ import net.kuujo.copycat.state.Query;
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
......
......@@ -26,7 +26,6 @@ import net.kuujo.copycat.util.concurrent.Futures;
import net.kuujo.copycat.util.function.TriConsumer;
import org.onlab.util.Match;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
......
......@@ -26,8 +26,8 @@ import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
......@@ -278,7 +278,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
}
private boolean isUpdatePossible(DatabaseUpdate update) {
private boolean isUpdatePossible(MapUpdate<String, byte[]> update) {
Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
switch (update.type()) {
case PUT:
......@@ -299,7 +299,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
}
private void doProvisionalUpdate(DatabaseUpdate update, long transactionId) {
private void doProvisionalUpdate(MapUpdate<String, byte[]> update, TransactionId transactionId) {
Map<String, Update> lockMap = getLockMap(update.mapName());
switch (update.type()) {
case PUT:
......@@ -318,7 +318,8 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
}
private UpdateResult<String, byte[]> commitProvisionalUpdate(DatabaseUpdate update, long transactionId) {
private UpdateResult<String, byte[]> commitProvisionalUpdate(
MapUpdate<String, byte[]> update, TransactionId transactionId) {
String mapName = update.mapName();
String key = update.key();
Update provisionalUpdate = getLockMap(mapName).get(key);
......@@ -330,7 +331,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
return mapUpdate(mapName, key, Match.any(), Match.any(), provisionalUpdate.value()).value();
}
private void undoProvisionalUpdate(DatabaseUpdate update, long transactionId) {
private void undoProvisionalUpdate(MapUpdate<String, byte[]> update, TransactionId transactionId) {
String mapName = update.mapName();
String key = update.key();
Update provisionalUpdate = getLockMap(mapName).get(key);
......@@ -342,7 +343,7 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
}
private boolean isLockedByAnotherTransaction(String mapName, String key, long transactionId) {
private boolean isLockedByAnotherTransaction(String mapName, String key, TransactionId transactionId) {
Update update = getLockMap(mapName).get(key);
return update != null && !Objects.equal(transactionId, update.transactionId());
}
......@@ -356,15 +357,15 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
private class Update {
private final long transactionId;
private final TransactionId transactionId;
private final byte[] value;
public Update(long txId, byte[] value) {
public Update(TransactionId txId, byte[] value) {
this.transactionId = txId;
this.value = value;
}
public long transactionId() {
public TransactionId transactionId() {
return this.transactionId;
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.List;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import com.google.common.collect.ImmutableList;
/**
* A Default transaction implementation.
*/
public class DefaultTransaction implements Transaction {
private final long transactionId;
private final List<DatabaseUpdate> updates;
private final State state;
private final long lastUpdated;
public DefaultTransaction(long transactionId, List<DatabaseUpdate> updates) {
this(transactionId, updates, State.PREPARING, System.currentTimeMillis());
}
private DefaultTransaction(long transactionId, List<DatabaseUpdate> updates, State state, long lastUpdated) {
this.transactionId = transactionId;
this.updates = ImmutableList.copyOf(updates);
this.state = state;
this.lastUpdated = lastUpdated;
}
@Override
public long id() {
return transactionId;
}
@Override
public List<DatabaseUpdate> updates() {
return updates;
}
@Override
public State state() {
return state;
}
@Override
public Transaction transition(State newState) {
return new DefaultTransaction(transactionId, updates, newState, System.currentTimeMillis());
}
@Override
public long lastUpdated() {
return lastUpdated;
}
}
\ No newline at end of file
......@@ -18,14 +18,17 @@ package org.onosproject.store.primitives.impl;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
......@@ -44,20 +47,20 @@ public class DefaultTransactionContext implements TransactionContext {
@SuppressWarnings("rawtypes")
private final Map<String, DefaultTransactionalMap> txMaps = Maps.newConcurrentMap();
private boolean isOpen = false;
private final Database database;
private final long transactionId;
private final Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter;
private final TransactionId transactionId;
private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
public DefaultTransactionContext(long transactionId,
Database database,
public DefaultTransactionContext(TransactionId transactionId,
Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter,
Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
this.transactionId = transactionId;
this.database = checkNotNull(database);
this.transactionCommitter = checkNotNull(transactionCommitter);
this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
}
@Override
public long transactionId() {
public TransactionId transactionId() {
return transactionId;
}
......@@ -91,13 +94,13 @@ public class DefaultTransactionContext implements TransactionContext {
public boolean commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
CommitResponse response = null;
CommitResult result = null;
try {
List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values().forEach(m -> updates.addAll(m.prepareDatabaseUpdates()));
Transaction transaction = new DefaultTransaction(transactionId, updates);
response = Futures.getUnchecked(database.prepareAndCommit(transaction));
return response.success();
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
Transaction transaction = new Transaction(transactionId, updates);
result = Futures.getUnchecked(transactionCommitter.apply(transaction));
return result == CommitResult.OK;
} catch (Exception e) {
abort();
return false;
......@@ -128,4 +131,9 @@ public class DefaultTransactionContext implements TransactionContext {
});
return s.toString();
}
@Override
public String name() {
return transactionId.toString();
}
}
......
......@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.impl;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionContextBuilder;
......@@ -22,29 +29,28 @@ import org.onosproject.store.service.TransactionContextBuilder;
* The default implementation of a transaction context builder. This builder
* generates a {@link DefaultTransactionContext}.
*/
public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
private boolean partitionsEnabled = true;
private final DatabaseManager manager;
private final long transactionId;
private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
private final Function<Transaction, CompletableFuture<CommitResult>> transactionCommitter;
private final TransactionId transactionId;
public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
this.manager = manager;
public DefaultTransactionContextBuilder(Supplier<ConsistentMapBuilder> mapBuilderSupplier,
Function<Transaction, CompletableFuture<CommitResult>> transactionCommiter,
TransactionId transactionId) {
this.mapBuilderSupplier = mapBuilderSupplier;
this.transactionCommitter = transactionCommiter;
this.transactionId = transactionId;
}
@Override
public TransactionContextBuilder withPartitionsDisabled() {
partitionsEnabled = false;
return this;
}
@Override
public TransactionContext build() {
return new DefaultTransactionContext(
transactionId,
partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
() -> partitionsEnabled ? manager.consistentMapBuilder()
: manager.consistentMapBuilder().withPartitionsDisabled());
return new DefaultTransactionContext(transactionId, transactionCommitter, () -> {
ConsistentMapBuilder mapBuilder = mapBuilderSupplier.get();
if (partitionsDisabled()) {
mapBuilder = mapBuilder.withPartitionsDisabled();
}
return mapBuilder;
});
}
}
......
......@@ -21,8 +21,8 @@ import java.util.Map;
import java.util.Set;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
......@@ -159,14 +159,14 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
return latest;
}
protected List<DatabaseUpdate> prepareDatabaseUpdates() {
List<DatabaseUpdate> updates = Lists.newLinkedList();
protected List<MapUpdate<String, byte[]>> toMapUpdates() {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
Versioned<V> original = readCache.get(key);
if (original != null) {
updates.add(DatabaseUpdate.newBuilder()
updates.add(MapUpdate.<String, byte[]>newBuilder()
.withMapName(name)
.withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.build());
......@@ -175,16 +175,16 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
writeCache.forEach((key, value) -> {
Versioned<V> original = readCache.get(key);
if (original == null) {
updates.add(DatabaseUpdate.newBuilder()
updates.add(MapUpdate.<String, byte[]>newBuilder()
.withMapName(name)
.withType(DatabaseUpdate.Type.PUT_IF_ABSENT)
.withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey(keyCache.getUnchecked(key))
.withValue(serializer.encode(value))
.build());
} else {
updates.add(DatabaseUpdate.newBuilder()
updates.add(MapUpdate.<String, byte[]>newBuilder()
.withMapName(name)
.withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
.withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.withKey(keyCache.getUnchecked(key))
.withCurrentVersion(original.version())
.withValue(serializer.encode(value))
......@@ -199,7 +199,7 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
.add("updates", prepareDatabaseUpdates())
.add("updates", toMapUpdates())
.toString();
}
......
......@@ -26,8 +26,8 @@ import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.resource.ResourceState;
import org.onlab.util.Match;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
......@@ -273,7 +273,9 @@ public class PartitionedDatabase implements Database {
if (transactionManager == null) {
throw new IllegalStateException("TransactionManager is not initialized");
}
return transactionManager.execute(transaction);
return transactionManager.execute(transaction)
.thenApply(r -> r == CommitResult.OK
? CommitResponse.success(ImmutableList.of()) : CommitResponse.failure());
}
}
......@@ -373,15 +375,15 @@ public class PartitionedDatabase implements Database {
private Map<Database, Transaction> createSubTransactions(
Transaction transaction) {
Map<Database, List<DatabaseUpdate>> perPartitionUpdates = Maps.newHashMap();
for (DatabaseUpdate update : transaction.updates()) {
Map<Database, List<MapUpdate<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
Database partition = partitioner.getPartition(update.mapName(), update.key());
List<DatabaseUpdate> partitionUpdates =
List<MapUpdate<String, byte[]>> partitionUpdates =
perPartitionUpdates.computeIfAbsent(partition, k -> Lists.newLinkedList());
partitionUpdates.add(update);
}
Map<Database, Transaction> subTransactions = Maps.newHashMap();
perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new Transaction(transaction.id(), v)));
return subTransactions;
}
......
......@@ -13,14 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
package org.onosproject.store.primitives.impl;
import java.util.List;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
/**
* An immutable transaction object.
*/
public interface Transaction {
public class Transaction {
enum State {
/**
......@@ -55,48 +61,44 @@ public interface Transaction {
ROLLEDBACK
}
/**
* Returns the transaction Id.
*
* @return transaction id
*/
long id();
/**
* Returns the list of updates that are part of this transaction.
*
* @return list of database updates
*/
List<DatabaseUpdate> updates();
/**
* Returns the current state of this transaction.
*
* @return transaction state
*/
State state();
/**
* Returns true if this transaction has completed execution.
*
* @return true is yes, false otherwise
*/
default boolean isDone() {
return state() == State.COMMITTED || state() == State.ROLLEDBACK;
private final TransactionId transactionId;
private final List<MapUpdate<String, byte[]>> updates;
private final State state;
public Transaction(TransactionId transactionId, List<MapUpdate<String, byte[]>> updates) {
this(transactionId, updates, State.PREPARING);
}
private Transaction(TransactionId transactionId,
List<MapUpdate<String, byte[]>> updates,
State state) {
this.transactionId = transactionId;
this.updates = ImmutableList.copyOf(updates);
this.state = state;
}
public TransactionId id() {
return transactionId;
}
/**
* Returns a new transaction that is created by transitioning this one to the specified state.
*
* @param newState destination state
* @return a new transaction instance similar to the current one but its state set to specified state
*/
Transaction transition(State newState);
/**
* Returns the system time when the transaction was last updated.
*
* @return last update time
*/
long lastUpdated();
}
public List<MapUpdate<String, byte[]>> updates() {
return updates;
}
public State state() {
return state;
}
public Transaction transition(State newState) {
return new Transaction(transactionId, updates, newState);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("transactionId", transactionId)
.add("updates", updates)
.add("state", state)
.toString();
}
}
\ No newline at end of file
......
......@@ -17,87 +17,62 @@ package org.onosproject.store.primitives.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.Transaction.State;
import com.google.common.collect.ImmutableList;
import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTED;
import static org.onosproject.store.primitives.impl.Transaction.State.COMMITTING;
import static org.onosproject.store.primitives.impl.Transaction.State.ROLLEDBACK;
import static org.onosproject.store.primitives.impl.Transaction.State.ROLLINGBACK;
/**
* Agent that runs the two phase commit protocol.
*/
public class TransactionManager {
private static final KryoNamespace KRYO_NAMESPACE = KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
.register(DatabaseUpdate.class)
.register(DatabaseUpdate.Type.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
.build();
private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
private final Database database;
private final AsyncConsistentMap<Long, Transaction> transactions;
private final AsyncConsistentMap<TransactionId, Transaction> transactions;
/**
* Constructs a new TransactionManager for the specified database instance.
*
* @param database database
* @param mapBuilder builder for ConsistentMap instances
*/
public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
public TransactionManager(Database database, AsyncConsistentMap<TransactionId, Transaction> transactions) {
this.database = checkNotNull(database, "database cannot be null");
this.transactions = mapBuilder.withName("onos-transactions")
.withSerializer(serializer)
.buildAsyncMap();
this.transactions = transactions;
}
/**
* Executes the specified transaction by employing a two phase commit protocol.
*
* @param transaction transaction to commit
* @return transaction result. Result value true indicates a successful commit, false
* indicates abort
* @return transaction commit result
*/
public CompletableFuture<CommitResponse> execute(Transaction transaction) {
public CompletableFuture<CommitResult> execute(Transaction transaction) {
// clean up if this transaction in already in a terminal state.
if (transaction.state() == Transaction.State.COMMITTED ||
transaction.state() == Transaction.State.ROLLEDBACK) {
return transactions.remove(transaction.id()).thenApply(v -> CommitResponse.success(ImmutableList.of()));
} else if (transaction.state() == Transaction.State.COMMITTING) {
if (transaction.state() == COMMITTED || transaction.state() == ROLLEDBACK) {
return transactions.remove(transaction.id()).thenApply(v -> CommitResult.OK);
} else if (transaction.state() == COMMITTING) {
return commit(transaction);
} else if (transaction.state() == Transaction.State.ROLLINGBACK) {
return rollback(transaction).thenApply(v -> CommitResponse.success(ImmutableList.of()));
} else if (transaction.state() == ROLLINGBACK) {
return rollback(transaction).thenApply(v -> CommitResult.FAILURE_TO_PREPARE);
} else {
return prepare(transaction).thenCompose(v -> v ? commit(transaction) : rollback(transaction));
}
}
/**
* Returns all transactions in the system.
* Returns all pending transaction identifiers.
*
* @return future for a collection of transactions
* @return future for a collection of transaction identifiers.
*/
public CompletableFuture<Collection<Transaction>> getTransactions() {
return transactions.values().thenApply(c -> {
Collection<Transaction> txns = c.stream().map(v -> v.value()).collect(Collectors.toList());
return txns;
});
public CompletableFuture<Collection<TransactionId>> getPendingTransactionIds() {
return transactions.values().thenApply(c -> c.stream()
.map(v -> v.value())
.filter(v -> v.state() != COMMITTED && v.state() != ROLLEDBACK)
.map(Transaction::id)
.collect(Collectors.toList()));
}
private CompletableFuture<Boolean> prepare(Transaction transaction) {
......@@ -105,22 +80,25 @@ public class TransactionManager {
.thenCompose(v -> database.prepare(transaction))
.thenCompose(status -> transactions.put(
transaction.id(),
transaction.transition(status ? State.COMMITTING : State.ROLLINGBACK))
transaction.transition(status ? COMMITTING : ROLLINGBACK))
.thenApply(v -> status));
}
private CompletableFuture<CommitResponse> commit(Transaction transaction) {
private CompletableFuture<CommitResult> commit(Transaction transaction) {
return database.commit(transaction)
.whenComplete((r, e) -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.COMMITTED)));
.thenCompose(r -> {
if (r.success()) {
return transactions.put(transaction.id(), transaction.transition(COMMITTED))
.thenApply(v -> CommitResult.OK);
} else {
return CompletableFuture.completedFuture(CommitResult.FAILURE_DURING_COMMIT);
}
});
}
private CompletableFuture<CommitResponse> rollback(Transaction transaction) {
private CompletableFuture<CommitResult> rollback(Transaction transaction) {
return database.rollback(transaction)
.thenCompose(v -> transactions.put(
transaction.id(),
transaction.transition(Transaction.State.ROLLEDBACK)))
.thenApply(v -> CommitResponse.failure());
.thenCompose(v -> transactions.put(transaction.id(), transaction.transition(ROLLEDBACK)))
.thenApply(v -> CommitResult.FAILURE_TO_PREPARE);
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
/**
* Participant in a two-phase commit protocol.
*/
public interface TransactionParticipant {
/**
* Attempts to execute the prepare phase for the specified {@link Transaction transaction}.
* @param transaction transaction
* @return future for prepare result
*/
CompletableFuture<PrepareResult> prepare(Transaction transaction);
/**
* Attempts to execute the commit phase for previously prepared transaction.
* @param transactionId transaction identifier
* @return future for commit result
*/
CompletableFuture<CommitResult> commit(TransactionId transactionId);
/**
* Attempts to execute the rollback phase for previously prepared transaction.
* @param transactionId transaction identifier
* @return future for rollback result
*/
CompletableFuture<RollbackResult> rollback(TransactionId transactionId);
}
\ No newline at end of file
......@@ -31,6 +31,9 @@ import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.impl.TransactionParticipant;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
......@@ -43,7 +46,7 @@ import com.google.common.collect.Sets;
*/
@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
implements AsyncConsistentMap<String, byte[]> {
implements AsyncConsistentMap<String, byte[]>, TransactionParticipant {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
......@@ -235,18 +238,6 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.
});
}
public CompletableFuture<PrepareResult> prepare(TransactionalMapUpdate<String, byte[]> update) {
return submit(new AtomixConsistentMapCommands.TransactionPrepare(update));
}
public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
}
public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
}
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
if (!mapEventListeners.isEmpty()) {
......@@ -274,6 +265,21 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.
}
}
@Override
public CompletableFuture<PrepareResult> prepare(Transaction transaction) {
return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction));
}
@Override
public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
}
@Override
public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
}
/**
* Change listener context.
*/
......
......@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Set;
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
......@@ -207,35 +209,35 @@ public final class AtomixConsistentMapCommands {
*/
@SuppressWarnings("serial")
public static class TransactionPrepare extends MapCommand<PrepareResult> {
private TransactionalMapUpdate<String, byte[]> update;
private Transaction transaction;
public TransactionPrepare() {
}
public TransactionPrepare(TransactionalMapUpdate<String, byte[]> update) {
this.update = update;
public TransactionPrepare(Transaction transaction) {
this.transaction = transaction;
}
public TransactionalMapUpdate<String, byte[]> transactionUpdate() {
return update;
public Transaction transaction() {
return transaction;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(update, buffer);
serializer.writeObject(transaction, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
update = serializer.readObject(buffer);
transaction = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("update", update)
.add("transaction", transaction)
.toString();
}
}
......
......@@ -37,6 +37,8 @@ import java.util.stream.Collectors;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Versioned;
......@@ -382,9 +384,8 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
boolean ok = false;
try {
TransactionalMapUpdate<String, byte[]> transactionUpdate = commit
.operation().transactionUpdate();
for (MapUpdate<String, byte[]> update : transactionUpdate.batch()) {
Transaction transaction = commit.operation().transaction();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
if (preparedKeys.contains(key)) {
return PrepareResult.CONCURRENT_TRANSACTION;
......@@ -403,8 +404,8 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
pendingTransactions.put(transactionUpdate.transactionId(), commit);
transactionUpdate.batch().forEach(u -> preparedKeys.add(u.key()));
pendingTransactions.put(transaction.id(), commit);
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
} finally {
......@@ -429,16 +430,15 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
TransactionalMapUpdate<String, byte[]> transactionalUpdate = prepareCommit
.operation().transactionUpdate();
long totalReferencesToCommit = transactionalUpdate
.batch()
Transaction transaction = prepareCommit.operation().transaction();
long totalReferencesToCommit = transaction
.updates()
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
CountDownCompleter<Commit<? extends AtomixConsistentMapCommands.TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
for (MapUpdate<String, byte[]> update : transactionalUpdate.batch()) {
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
MapEntryValue previousValue = mapEntries.remove(key);
MapEntryValue newValue = null;
......@@ -473,8 +473,10 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
if (prepareCommit == null) {
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
prepareCommit.operation().transactionUpdate().batch()
.forEach(u -> preparedKeys.remove(u.key()));
prepareCommit.operation()
.transaction()
.updates()
.forEach(u -> preparedKeys.remove(u.key()));
prepareCommit.close();
return RollbackResult.OK;
}
......@@ -608,9 +610,8 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
@Override
public byte[] value() {
TransactionalMapUpdate<String, byte[]> update = completer.object()
.operation().transactionUpdate();
return update.valueForKey(key);
Transaction transaction = completer.object().operation().transaction();
return valueForKey(key, transaction);
}
@Override
......@@ -622,5 +623,14 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
public void discard() {
completer.countDown();
}
private byte[] valueForKey(String key, Transaction transaction) {
MapUpdate<String, byte[]> update = transaction.updates()
.stream()
.filter(u -> u.key().equals(key))
.findFirst()
.orElse(null);
return update == null ? null : update.value();
}
}
}
......
......@@ -28,4 +28,14 @@ public enum CommitResult {
* Signifies a failure due to unrecognized transaction identifier.
*/
UNKNOWN_TRANSACTION_ID,
/**
* Signifies a failure to get participants to agree to commit (during prepare stage).
*/
FAILURE_TO_PREPARE,
/**
* Failure during commit phase.
*/
FAILURE_DURING_COMMIT
}
......
......@@ -29,6 +29,8 @@ import com.google.common.base.MoreObjects;
* Both old and new values are accessible along with a flag that indicates if the
* the value was updated. If flag is false, oldValue and newValue both
* point to the same unmodified value.
*
* @param <K> key type
* @param <V> result type
*/
public class MapEntryUpdateResult<K, V> {
......@@ -123,6 +125,8 @@ public class MapEntryUpdateResult<K, V> {
* @param keyTransform transformer to use for transcoding keys
* @param valueMapper mapper to use for transcoding values
* @return new instance
* @param <K1> key type of returned {@code MapEntryUpdateResult}
* @param <V1> value type of returned {@code MapEntryUpdateResult}
*/
public <K1, V1> MapEntryUpdateResult<K1, V1> map(Function<K, K1> keyTransform, Function<V, V1> valueMapper) {
return new MapEntryUpdateResult<>(status,
......
......@@ -70,6 +70,7 @@ public final class MapUpdate<K, V> {
REMOVE_IF_VALUE_MATCH,
}
private String mapName;
private Type type;
private K key;
private V value;
......@@ -77,6 +78,15 @@ public final class MapUpdate<K, V> {
private long currentVersion = -1;
/**
* Returns the name of the map.
*
* @return map name
*/
public String mapName() {
return mapName;
}
/**
* Returns the type of update operation.
* @return type of update.
*/
......@@ -119,6 +129,7 @@ public final class MapUpdate<K, V> {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("mapName", mapName)
.add("type", type)
.add("key", key)
.add("value", value)
......@@ -153,6 +164,11 @@ public final class MapUpdate<K, V> {
return update;
}
public Builder<K, V> withMapName(String name) {
update.mapName = checkNotNull(name, "name cannot be null");
return this;
}
public Builder<K, V> withType(Type type) {
update.type = checkNotNull(type, "type cannot be null");
return this;
......
......@@ -25,6 +25,11 @@ public enum PrepareResult {
OK,
/**
* Signifies some participants in a distributed prepare operation failed.
*/
PARTIAL_FAILURE,
/**
* Signifies a failure to another transaction locking the underlying state.
*/
CONCURRENT_TRANSACTION,
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.Collection;
import java.util.Map;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
* A batch updates to an {@code AsyncConsistentMap} be committed as a transaction.
*
* @param <K> key type
* @param <V> value type
*/
public class TransactionalMapUpdate<K, V> {
private final TransactionId transactionId;
private final Collection<MapUpdate<K, V>> updates;
private boolean indexPopulated = false;
private final Map<K, V> keyValueIndex = Maps.newHashMap();
public TransactionalMapUpdate(TransactionId transactionId, Collection<MapUpdate<K, V>> updates) {
this.transactionId = transactionId;
this.updates = ImmutableList.copyOf(updates);
populateIndex();
}
/**
* Returns the transaction identifier.
* @return transaction id
*/
public TransactionId transactionId() {
return transactionId;
}
/**
* Returns the collection of map updates.
* @return map updates
*/
public Collection<MapUpdate<K, V>> batch() {
return updates;
}
/**
* Returns the value that will be associated with the key after this transaction commits.
* @param key key
* @return value that will be associated with the value once this transaction commits
*/
public V valueForKey(K key) {
if (!indexPopulated) {
// We do not synchronize as we don't expect this called to be made from multiple threads.
populateIndex();
}
return keyValueIndex.get(key);
}
/**
* Populates the internal key -> value mapping.
*/
private synchronized void populateIndex() {
updates.forEach(mapUpdate -> {
if (mapUpdate.value() != null) {
keyValueIndex.put(mapUpdate.key(), mapUpdate.value());
}
});
indexPopulated = true;
}
}
......@@ -41,7 +41,6 @@ import org.onlab.util.Match;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
......
......@@ -27,6 +27,8 @@ import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
......@@ -351,10 +353,9 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
.withValue(value1)
.build();
TransactionalMapUpdate<String, byte[]> txMapUpdate =
new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(txMapUpdate).thenAccept(result -> {
map.prepare(tx).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
}).join();
assertNull(listener.event());
......@@ -376,7 +377,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
assertNull(listener.event());
map.commit(txMapUpdate.transactionId()).join();
map.commit(tx.id()).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
......@@ -406,14 +407,13 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
.withKey("foo")
.withValue(value1)
.build();
TransactionalMapUpdate<String, byte[]> txMapUpdate =
new TransactionalMapUpdate<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(txMapUpdate).thenAccept(result -> {
Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(tx).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
}).join();
assertNull(listener.event());
map.rollback(txMapUpdate.transactionId()).join();
map.rollback(tx.id()).join();
assertNull(listener.event());
map.get("foo").thenAccept(result -> {
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
package org.onosproject.store.primitives.resources.impl;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
......@@ -22,63 +22,63 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
/**
* Unit Tests for DatabseUpdate class.
* Unit Tests for MapUpdate class.
*/
public class DatabaseUpdateTest {
public class MapUpdateTest {
private final DatabaseUpdate stats1 = DatabaseUpdate.newBuilder()
private final MapUpdate<String, byte[]> stats1 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
.withType(DatabaseUpdate.Type.PUT)
.withType(MapUpdate.Type.PUT)
.build();
private final DatabaseUpdate stats2 = DatabaseUpdate.newBuilder()
private final MapUpdate<String, byte[]> stats2 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
.withType(DatabaseUpdate.Type.REMOVE)
.withType(MapUpdate.Type.REMOVE)
.build();
private final DatabaseUpdate stats3 = DatabaseUpdate.newBuilder()
private final MapUpdate<String, byte[]> stats3 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
.withType(DatabaseUpdate.Type.REMOVE_IF_VALUE_MATCH)
.withType(MapUpdate.Type.REMOVE_IF_VALUE_MATCH)
.build();
private final DatabaseUpdate stats4 = DatabaseUpdate.newBuilder()
private final MapUpdate<String, byte[]> stats4 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
.withType(DatabaseUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.build();
private final DatabaseUpdate stats5 = DatabaseUpdate.newBuilder()
private final MapUpdate<String, byte[]> stats5 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
.withType(DatabaseUpdate.Type.PUT_IF_VALUE_MATCH)
.withType(MapUpdate.Type.PUT_IF_VALUE_MATCH)
.build();
private final DatabaseUpdate stats6 = DatabaseUpdate.newBuilder()
private final MapUpdate<String, byte[]> stats6 = MapUpdate.<String, byte[]>newBuilder()
.withCurrentValue("1".getBytes())
.withValue("2".getBytes())
.withCurrentVersion(3)
.withKey("4")
.withMapName("5")
.withType(DatabaseUpdate.Type.PUT_IF_VERSION_MATCH)
.withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.build();
/**
......@@ -91,7 +91,7 @@ public class DatabaseUpdateTest {
assertThat(stats1.currentVersion(), is(3L));
assertThat(stats1.key(), is("4"));
assertThat(stats1.mapName(), is("5"));
assertThat(stats1.type(), is(DatabaseUpdate.Type.PUT));
assertThat(stats1.type(), is(MapUpdate.Type.PUT));
}
/**
......
......@@ -199,6 +199,7 @@ import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
import org.onosproject.net.resource.link.MplsLabelResourceRequest;
import org.onosproject.security.Permission;
import org.onosproject.store.Timestamp;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Versioned;
......@@ -483,6 +484,7 @@ public final class KryoNamespaces {
.register(new ExtensionCriterionSerializer(), ExtensionCriterion.class)
.register(ExtensionSelectorType.class)
.register(ExtensionTreatmentType.class)
.register(TransactionId.class)
.register(Versioned.class)
.register(MapEvent.class)
.register(MapEvent.Type.class)
......