Madan Jampani
Committed by Gerrit Code Review

AsyncConsistentMap methods for supporting transactional updates

Change-Id: Iaeb0aa0abf9f52d514a2c040598599a5b8a55ee8
Showing 21 changed files with 353 additions and 71 deletions
......@@ -14,12 +14,13 @@
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
package org.onosproject.store.primitives;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import java.util.function.Function;
import com.google.common.base.MoreObjects;
/**
......@@ -126,6 +127,26 @@ public final class MapUpdate<K, V> {
return currentVersion;
}
/**
* Transforms this instance into an instance of different paramterized types.
*
* @param keyMapper transcoder for key type
* @param valueMapper transcoder to value type
* @return new instance
* @param <S> key type of returned instance
* @param <T> value type of returned instance
*/
public <S, T> MapUpdate<S, T> map(Function<K, S> keyMapper, Function<V, T> valueMapper) {
return MapUpdate.<S, T>newBuilder()
.withMapName(mapName)
.withType(type)
.withKey(keyMapper.apply(key))
.withValue(value == null ? null : valueMapper.apply(value))
.withCurrentValue(currentValue == null ? null : valueMapper.apply(currentValue))
.withCurrentVersion(currentVersion)
.build();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
......@@ -180,17 +201,16 @@ public final class MapUpdate<K, V> {
}
public Builder<K, V> withCurrentValue(V value) {
update.currentValue = checkNotNull(value, "currentValue cannot be null");
update.currentValue = value;
return this;
}
public Builder<K, V> withValue(V value) {
update.value = checkNotNull(value, "value cannot be null");
update.value = value;
return this;
}
public Builder<K, V> withCurrentVersion(long version) {
checkArgument(version >= 0, "version cannot be negative");
update.currentVersion = version;
return this;
}
......
......@@ -26,6 +26,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import org.onosproject.store.primitives.DefaultConsistentMap;
import org.onosproject.store.primitives.TransactionId;
/**
* A distributed, strongly consistent map whose methods are all executed asynchronously.
......@@ -319,6 +320,37 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
CompletableFuture<Void> removeListener(MapEventListener<K, V> listener);
/**
* Prepares a transaction for commitment.
* @param transaction transaction
* @return {@code true} if prepare is successful and transaction is ready to be committed;
* {@code false} otherwise
*/
CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction);
/**
* Commits a previously prepared transaction.
* @param transactionId transaction identifier
* @return future that will be completed when the operation finishes
*/
CompletableFuture<Void> commit(TransactionId transactionId);
/**
* Aborts a previously prepared transaction.
* @param transactionId transaction identifier
* @return future that will be completed when the operation finishes
*/
CompletableFuture<Void> rollback(TransactionId transactionId);
/**
* Returns a new {@link ConsistentMap} that is backed by this instance.
*
* @return new {@code ConsistentMap} instance
*/
default ConsistentMap<K, V> asConsistentMap() {
return asConsistentMap(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS);
}
/**
* Returns a new {@link ConsistentMap} that is backed by this instance.
*
* @param timeoutMillis timeout duration for the returned ConsistentMap operations
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.List;
import java.util.function.Function;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Collection of map updates to be committed atomically.
*
* @param <K> key type
* @param <V> value type
*/
public class MapTransaction<K, V> {
private final TransactionId transactionId;
private final List<MapUpdate<K, V>> updates;
public MapTransaction(TransactionId transactionId, List<MapUpdate<K, V>> updates) {
this.transactionId = transactionId;
this.updates = ImmutableList.copyOf(updates);
}
/**
* Returns the transaction identifier.
*
* @return transaction id
*/
public TransactionId transactionId() {
return transactionId;
}
/**
* Returns the list of map updates.
*
* @return map updates
*/
public List<MapUpdate<K, V>> updates() {
return updates;
}
/**
* Maps this instance to another {@code MapTransaction} with different key and value types.
*
* @param keyMapper function for mapping key types
* @param valueMapper function for mapping value types
* @return newly typed instance
*
* @param <S> key type of returned instance
* @param <T> value type of returned instance
*/
public <S, T> MapTransaction<S, T> map(Function<K, S> keyMapper, Function<V, T> valueMapper) {
return new MapTransaction<>(transactionId, Lists.transform(updates, u -> u.map(keyMapper, valueMapper)));
}
}
......@@ -13,9 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
package org.onosproject.store.primitives;
import com.google.common.testing.EqualsTester;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
......
......@@ -32,17 +32,18 @@ import java.util.Scanner;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapState;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
......@@ -65,7 +66,7 @@ public final class CatalystSerializers {
MapEntryUpdateResult.Status.class,
MapUpdate.class,
MapUpdate.Type.class,
Transaction.class,
MapTransaction.class,
Transaction.State.class,
TransactionId.class,
PrepareResult.class,
......@@ -99,7 +100,7 @@ public final class CatalystSerializers {
serializer.register(Match.class, factory);
serializer.register(MapEntryUpdateResult.class, factory);
serializer.register(MapEntryUpdateResult.Status.class, factory);
serializer.register(Transaction.class, factory);
serializer.register(MapTransaction.class, factory);
serializer.register(Transaction.State.class, factory);
serializer.register(PrepareResult.class, factory);
serializer.register(CommitResult.class, factory);
......
......@@ -65,8 +65,8 @@ import org.onosproject.cluster.PartitionId;
import org.onosproject.core.ApplicationId;
import org.onosproject.persistence.PersistenceService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AtomicCounterBuilder;
......
......@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Match;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Versioned;
......
......@@ -40,11 +40,13 @@ import org.onlab.util.Match;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.ConsistentMapException.ConcurrentModification;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.onosproject.utils.MeteringAgent;
......@@ -491,6 +493,21 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
return Tools.exceptionalFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return Tools.exceptionalFuture(new UnsupportedOperationException());
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return Tools.exceptionalFuture(new UnsupportedOperationException());
}
protected void notifyListeners(MapEvent<K, V> event) {
if (event == null) {
return;
......
......@@ -26,8 +26,8 @@ import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Arrays;
......
......@@ -24,9 +24,9 @@ import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
......@@ -84,7 +84,10 @@ public class DefaultTransactionContext implements TransactionContext {
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
mapBuilderSupplier.get()
.withName(name)
.withSerializer(serializer)
.buildAsyncMap(),
this,
serializer));
}
......@@ -113,7 +116,7 @@ public class DefaultTransactionContext implements TransactionContext {
public void abort() {
if (isOpen) {
try {
txMaps.values().forEach(m -> m.rollback());
txMaps.values().forEach(m -> m.abort());
} finally {
isOpen = false;
}
......
......@@ -19,10 +19,13 @@ package org.onosproject.store.primitives.impl;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
......@@ -46,11 +49,12 @@ import com.google.common.collect.Sets;
* @param <K> key type
* @param <V> value type.
*/
public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
private final TransactionContext txContext;
private static final String TX_CLOSED_ERROR = "Transaction is closed";
private final ConsistentMap<K, V> backingMap;
private final AsyncConsistentMap<K, V> backingMap;
private final ConsistentMap<K, V> backingConsitentMap;
private final String name;
private final Serializer serializer;
private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
......@@ -76,11 +80,12 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
public DefaultTransactionalMap(
String name,
ConsistentMap<K, V> backingMap,
AsyncConsistentMap<K, V> backingMap,
TransactionContext txContext,
Serializer serializer) {
this.name = name;
this.backingMap = backingMap;
this.backingConsitentMap = backingMap.asConsistentMap();
this.txContext = txContext;
this.serializer = serializer;
}
......@@ -96,7 +101,7 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
if (latest != null) {
return latest;
} else {
Versioned<V> v = readCache.computeIfAbsent(key, k -> backingMap.get(k));
Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsitentMap.get(k));
return v != null ? v.value() : null;
}
}
......@@ -159,6 +164,62 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
return latest;
}
@Override
public CompletableFuture<Boolean> prepare() {
return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
}
@Override
public CompletableFuture<Void> commit() {
return backingMap.commit(txContext.transactionId());
}
@Override
public CompletableFuture<Void> rollback() {
return backingMap.rollback(txContext.transactionId());
}
@Override
public boolean hasPendingUpdates() {
return updates().size() > 0;
}
protected List<MapUpdate<K, V>> updates() {
List<MapUpdate<K, V>> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
Versioned<V> original = readCache.get(key);
if (original != null) {
updates.add(MapUpdate.<K, V>newBuilder()
.withMapName(name)
.withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.withKey(key)
.withCurrentVersion(original.version())
.build());
}
});
writeCache.forEach((key, value) -> {
Versioned<V> original = readCache.get(key);
if (original == null) {
updates.add(MapUpdate.<K, V>newBuilder()
.withMapName(name)
.withType(MapUpdate.Type.PUT_IF_ABSENT)
.withKey(key)
.withValue(value)
.build());
} else {
updates.add(MapUpdate.<K, V>newBuilder()
.withMapName(name)
.withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
.withKey(key)
.withCurrentVersion(original.version())
.withValue(value)
.build());
}
});
return updates;
}
protected List<MapUpdate<String, byte[]>> toMapUpdates() {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
deleteSet.forEach(key -> {
......@@ -194,19 +255,18 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
return updates;
}
// TODO: build expected result Map processing DB updates?
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("backingMap", backingMap)
.add("updates", toMapUpdates())
.add("updates", updates())
.toString();
}
/**
* Discards all changes made to this transactional map.
*/
protected void rollback() {
protected void abort() {
readCache.clear();
writeCache.clear();
deleteSet.clear();
......
......@@ -27,8 +27,10 @@ import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
......@@ -161,6 +163,21 @@ public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K,
}
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
return delegateMap.prepare(transaction);
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return delegateMap.commit(transactionId);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return delegateMap.rollback(transactionId);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("delegateMap", delegateMap)
......
......@@ -28,10 +28,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.onlab.util.Tools;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
......@@ -198,6 +203,39 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
transaction.updates().forEach(update -> {
AsyncConsistentMap<K, V> map = getMap(update.key());
updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
});
Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
Maps.transformValues(updatesGroupedByMap,
list -> new MapTransaction<>(transaction.transactionId(), list));
return Tools.allOf(transactionsByMap.entrySet()
.stream()
.map(e -> e.getKey().prepare(e.getValue()))
.collect(Collectors.toList()))
.thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return CompletableFuture.allOf(getMaps().stream()
.map(e -> e.commit(transactionId))
.toArray(CompletableFuture[]::new));
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return CompletableFuture.allOf(getMaps().stream()
.map(e -> e.rollback(transactionId))
.toArray(CompletableFuture[]::new));
}
/**
* Returns the map (partition) to which the specified key maps.
* @param key key
......
......@@ -26,8 +26,8 @@ import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.resource.ResourceState;
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
......
......@@ -17,8 +17,8 @@ package org.onosproject.store.primitives.impl;
import java.util.List;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.MapUpdate;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
......
......@@ -17,34 +17,32 @@ package org.onosproject.store.primitives.impl;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
import org.onosproject.store.primitives.resources.impl.RollbackResult;
/**
* Participant in a two-phase commit protocol.
*/
public interface TransactionParticipant {
/**
* Attempts to execute the prepare phase for the specified {@link Transaction transaction}.
* @param transaction transaction
* @return future for prepare result
* Returns if this participant has updates that need to be committed.
* @return {@code true} if yes; {@code false} otherwise
*/
boolean hasPendingUpdates();
/**
* Executes the prepare phase.
* @return {@code true} is successful; {@code false} otherwise
*/
CompletableFuture<PrepareResult> prepare(Transaction transaction);
CompletableFuture<Boolean> prepare();
/**
* Attempts to execute the commit phase for previously prepared transaction.
* @param transactionId transaction identifier
* @return future for commit result
* @return future that is completed when the operation completes
*/
CompletableFuture<CommitResult> commit(TransactionId transactionId);
CompletableFuture<Void> commit();
/**
* Attempts to execute the rollback phase for previously prepared transaction.
* @param transactionId transaction identifier
* @return future for rollback result
* @return future that is completed when the operation completes
*/
CompletableFuture<RollbackResult> rollback(TransactionId transactionId);
CompletableFuture<Void> rollback();
}
\ No newline at end of file
......
......@@ -26,9 +26,11 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
......@@ -196,6 +198,21 @@ public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsi
}
}
@Override
public CompletableFuture<Boolean> prepare(MapTransaction<K1, V1> transaction) {
return backingMap.prepare(transaction.map(keyEncoder, valueEncoder));
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
return backingMap.commit(transactionId);
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return backingMap.rollback(transactionId);
}
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
......
......@@ -32,11 +32,10 @@ import java.util.function.Predicate;
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.impl.TransactionParticipant;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Sets;
......@@ -46,7 +45,7 @@ import com.google.common.collect.Sets;
*/
@ResourceTypeInfo(id = -151, stateMachine = AtomixConsistentMapState.class)
public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.Options>
implements AsyncConsistentMap<String, byte[]>, TransactionParticipant {
implements AsyncConsistentMap<String, byte[]> {
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
......@@ -266,18 +265,21 @@ public class AtomixConsistentMap extends Resource<AtomixConsistentMap, Resource.
}
@Override
public CompletableFuture<PrepareResult> prepare(Transaction transaction) {
return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction));
public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
return submit(new AtomixConsistentMapCommands.TransactionPrepare(transaction))
.thenApply(v -> v == PrepareResult.OK);
}
@Override
public CompletableFuture<CommitResult> commit(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId));
public CompletableFuture<Void> commit(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionCommit(transactionId))
.thenApply(v -> null);
}
@Override
public CompletableFuture<RollbackResult> rollback(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId));
public CompletableFuture<Void> rollback(TransactionId transactionId) {
return submit(new AtomixConsistentMapCommands.TransactionRollback(transactionId))
.thenApply(v -> null);
}
/**
......
......@@ -29,7 +29,7 @@ import java.util.Set;
import org.onlab.util.Match;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
......@@ -209,35 +209,35 @@ public final class AtomixConsistentMapCommands {
*/
@SuppressWarnings("serial")
public static class TransactionPrepare extends MapCommand<PrepareResult> {
private Transaction transaction;
private MapTransaction<String, byte[]> mapTransaction;
public TransactionPrepare() {
}
public TransactionPrepare(Transaction transaction) {
this.transaction = transaction;
public TransactionPrepare(MapTransaction<String, byte[]> mapTransaction) {
this.mapTransaction = mapTransaction;
}
public Transaction transaction() {
return transaction;
public MapTransaction<String, byte[]> transaction() {
return mapTransaction;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
serializer.writeObject(transaction, buffer);
serializer.writeObject(mapTransaction, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
transaction = serializer.readObject(buffer);
mapTransaction = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("transaction", transaction)
.add("mapTransaction", mapTransaction)
.toString();
}
}
......
......@@ -37,10 +37,11 @@ import java.util.stream.Collectors;
import org.onlab.util.CountDownCompleter;
import org.onlab.util.Match;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
......@@ -384,7 +385,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
Commit<? extends AtomixConsistentMapCommands.TransactionPrepare> commit) {
boolean ok = false;
try {
Transaction transaction = commit.operation().transaction();
MapTransaction<String, byte[]> transaction = commit.operation().transaction();
for (MapUpdate<String, byte[]> update : transaction.updates()) {
String key = update.key();
if (preparedKeys.contains(key)) {
......@@ -404,7 +405,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
// No violations detected. Add to pendingTranctions and mark
// modified keys as
// currently locked to updates.
pendingTransactions.put(transaction.id(), commit);
pendingTransactions.put(transaction.transactionId(), commit);
transaction.updates().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
......@@ -430,7 +431,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
Transaction transaction = prepareCommit.operation().transaction();
MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
long totalReferencesToCommit = transaction
.updates()
.stream()
......@@ -610,7 +611,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
@Override
public byte[] value() {
Transaction transaction = completer.object().operation().transaction();
MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
return valueForKey(key, transaction);
}
......@@ -624,7 +625,7 @@ public class AtomixConsistentMapState extends ResourceStateMachine implements
completer.countDown();
}
private byte[] valueForKey(String key, Transaction transaction) {
private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
MapUpdate<String, byte[]> update = transaction.updates()
.stream()
.filter(u -> u.key().equals(key))
......
......@@ -27,10 +27,11 @@ import java.util.stream.Collectors;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.impl.Transaction;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Sets;
......@@ -353,10 +354,10 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
.withValue(value1)
.build();
Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(tx).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
assertEquals(true, result);
}).join();
assertNull(listener.event());
......@@ -377,7 +378,7 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
assertNull(listener.event());
map.commit(tx.id()).join();
map.commit(tx.transactionId()).join();
assertNotNull(listener.event());
assertEquals(MapEvent.Type.INSERT, listener.event().type());
assertTrue(Arrays.equals(value1, listener.event().newValue().value()));
......@@ -407,13 +408,13 @@ public class AtomixConsistentMapTest extends AtomixTestBase {
.withKey("foo")
.withValue(value1)
.build();
Transaction tx = new Transaction(TransactionId.from("tx1"), Arrays.asList(update1));
MapTransaction<String, byte[]> tx = new MapTransaction<>(TransactionId.from("tx1"), Arrays.asList(update1));
map.prepare(tx).thenAccept(result -> {
assertEquals(PrepareResult.OK, result);
assertEquals(true, result);
}).join();
assertNull(listener.event());
map.rollback(tx.id()).join();
map.rollback(tx.transactionId()).join();
assertNull(listener.event());
map.get("foo").thenAccept(result -> {
......