Madan Jampani

WIP: Revamped transaction API. Introduces a transaction context for running bloc…

…ks of code that can be committed
atomically.

Change-Id: I6ba21050a2644a42f3c073fa04ff776ef2c5ff4c
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 import java.util.Collection; 19 import java.util.Collection;
20 -import java.util.List;
21 import java.util.Set; 20 import java.util.Set;
22 import java.util.Map.Entry; 21 import java.util.Map.Entry;
23 22
...@@ -37,13 +36,6 @@ import java.util.Map.Entry; ...@@ -37,13 +36,6 @@ import java.util.Map.Entry;
37 * concurrency by allowing conditional updates that take into consideration 36 * concurrency by allowing conditional updates that take into consideration
38 * the version or value that was previously read. 37 * the version or value that was previously read.
39 * </p><p> 38 * </p><p>
40 - * The map also supports atomic batch updates (transactions). One can provide a list
41 - * of updates to be applied atomically if and only if all the operations are guaranteed
42 - * to succeed i.e. all their preconditions are met. For example, the precondition
43 - * for a putIfAbsent API call is absence of a mapping for the key. Similarly, the
44 - * precondition for a conditional replace operation is the presence of an expected
45 - * version or value
46 - * </p><p>
47 * This map does not allow null values. All methods can throw a ConsistentMapException 39 * This map does not allow null values. All methods can throw a ConsistentMapException
48 * (which extends RuntimeException) to indicate failures. 40 * (which extends RuntimeException) to indicate failures.
49 * 41 *
...@@ -202,15 +194,4 @@ public interface ConsistentMap<K, V> { ...@@ -202,15 +194,4 @@ public interface ConsistentMap<K, V> {
202 * @return true if the value was replaced 194 * @return true if the value was replaced
203 */ 195 */
204 boolean replace(K key, long oldVersion, V newValue); 196 boolean replace(K key, long oldVersion, V newValue);
205 -
206 - /**
207 - * Atomically apply the specified list of updates to the map.
208 - * If any of the updates cannot be applied due to a precondition
209 - * violation, none of the updates will be applied and the state of
210 - * the map remains unaltered.
211 - *
212 - * @param updates list of updates to apply atomically.
213 - * @return true if the map was updated.
214 - */
215 - boolean batchUpdate(List<UpdateOperation<K, V>> updates);
216 } 197 }
......
...@@ -39,5 +39,9 @@ public interface StorageService { ...@@ -39,5 +39,9 @@ public interface StorageService {
39 */ 39 */
40 <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer); 40 <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer);
41 41
42 - // TODO: add API for creating Eventually Consistent Map. 42 + /**
43 + * Creates a new transaction context.
44 + * @return transaction context
45 + */
46 + TransactionContext createTransactionContext();
43 } 47 }
...\ No newline at end of file ...\ No newline at end of file
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.service;
18 +
19 +/**
20 + * Provides a context for transactional operations.
21 + * <p>
22 + * A transaction context provides a boundary within which transactions
23 + * are run. It also is a place where all modifications made within a transaction
24 + * are cached until the point when the transaction commits or aborts. It thus ensures
25 + * isolation of work happening with in the transaction boundary.
26 + * <p>
27 + * A transaction context is a vehicle for grouping operations into a unit with the
28 + * properties of atomicity, isolation, and durability. Transactions also provide the
29 + * ability to maintain an application's invariants or integrity constraints,
30 + * supporting the property of consistency. Together these properties are known as ACID.
31 + */
32 +public interface TransactionContext {
33 +
34 + /**
35 + * Returns if this transaction context is open.
36 + * @return true if open, false otherwise.
37 + */
38 + boolean isOpen();
39 +
40 + /**
41 + * Starts a new transaction.
42 + */
43 + void begin();
44 +
45 + /**
46 + * Commits a transaction that was previously started thereby making its changes permanent
47 + * and externally visible.
48 + * @throws TransactionException if transaction fails to commit.
49 + */
50 + void commit();
51 +
52 + /**
53 + * Rolls back the current transaction, discarding all its changes.
54 + */
55 + void rollback();
56 +
57 + /**
58 + * Creates a new transactional map.
59 + * @param mapName name of the transactional map.
60 + * @param serializer serializer to use for encoding/decoding keys and vaulues.
61 + * @return new Transactional Map.
62 + */
63 + <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName, Serializer serializer);
64 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.service;
18 +
19 +/**
20 + * Top level exception for Transaction failures.
21 + */
22 +@SuppressWarnings("serial")
23 +public class TransactionException extends RuntimeException {
24 + public TransactionException() {
25 + }
26 +
27 + public TransactionException(Throwable t) {
28 + super(t);
29 + }
30 +
31 + /**
32 + * Transaction timeout.
33 + */
34 + public static class Timeout extends TransactionException {
35 + }
36 +
37 + /**
38 + * Transaction interrupted.
39 + */
40 + public static class Interrupted extends TransactionException {
41 + }
42 +
43 + /**
44 + * Transaction failure due to optimistic concurrency failure.
45 + */
46 + public static class OptimisticConcurrencyFailure extends TransactionException {
47 + }
48 +}
...\ No newline at end of file ...\ No newline at end of file
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.service;
18 +
19 +import java.util.Collection;
20 +import java.util.Set;
21 +import java.util.Map.Entry;
22 +
23 +/**
24 + * Transactional Map data structure.
25 + * <p>
26 + * A TransactionalMap is created by invoking {@link TransactionContext#createTransactionalMap createTransactionalMap}
27 + * method. All operations performed on this map with in a transaction boundary are invisible externally
28 + * until the point when the transaction commits. A commit usually succeeds in the absence of conflicts.
29 + *
30 + * @param <K> type of key.
31 + * @param <V> type of value.
32 + */
33 +public interface TransactionalMap<K, V> {
34 +
35 + /**
36 + * Returns the number of entries in the map.
37 + *
38 + * @return map size.
39 + */
40 + int size();
41 +
42 + /**
43 + * Returns true if the map is empty.
44 + *
45 + * @return true if map has no entries, false otherwise.
46 + */
47 + boolean isEmpty();
48 +
49 + /**
50 + * Returns true if this map contains a mapping for the specified key.
51 + *
52 + * @param key key
53 + * @return true if map contains key, false otherwise.
54 + */
55 + boolean containsKey(K key);
56 +
57 + /**
58 + * Returns true if this map contains the specified value.
59 + *
60 + * @param value value
61 + * @return true if map contains value, false otherwise.
62 + */
63 + boolean containsValue(V value);
64 +
65 + /**
66 + * Returns the value to which the specified key is mapped, or null if this
67 + * map contains no mapping for the key.
68 + *
69 + * @param key the key whose associated value is to be returned
70 + * @return the value to which the specified key is mapped, or null if
71 + * this map contains no mapping for the key
72 + */
73 + V get(K key);
74 +
75 + /**
76 + * Associates the specified value with the specified key in this map (optional operation).
77 + * If the map previously contained a mapping for the key, the old value is replaced by the
78 + * specified value.
79 + *
80 + * @param key key with which the specified value is to be associated
81 + * @param value value to be associated with the specified key
82 + * @return the previous value associated with key, or null if there was
83 + * no mapping for key.
84 + */
85 + V put(K key, V value);
86 +
87 + /**
88 + * Removes the mapping for a key from this map if it is present (optional operation).
89 + *
90 + * @param key key whose value is to be removed from the map
91 + * @return the value to which this map previously associated the key,
92 + * or null if the map contained no mapping for the key.
93 + */
94 + V remove(K key);
95 +
96 + /**
97 + * Removes all of the mappings from this map (optional operation).
98 + * The map will be empty after this call returns.
99 + */
100 + void clear();
101 +
102 + /**
103 + * Returns a Set view of the keys contained in this map.
104 + * This method differs from the behavior of java.util.Map.keySet() in that
105 + * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
106 + * Attempts to modify the returned set, whether direct or via its iterator,
107 + * result in an UnsupportedOperationException.
108 + *
109 + * @return a set of the keys contained in this map
110 + */
111 + Set<K> keySet();
112 +
113 + /**
114 + * Returns the collection of values contained in this map.
115 + * This method differs from the behavior of java.util.Map.values() in that
116 + * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
117 + * Attempts to modify the returned collection, whether direct or via its iterator,
118 + * result in an UnsupportedOperationException.
119 + *
120 + * @return a collection of the values contained in this map
121 + */
122 + Collection<V> values();
123 +
124 + /**
125 + * Returns the set of entries contained in this map.
126 + * This method differs from the behavior of java.util.Map.entrySet() in that
127 + * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
128 + * Attempts to modify the returned set, whether direct or via its iterator,
129 + * result in an UnsupportedOperationException.
130 + *
131 + * @return set of entries contained in this map.
132 + */
133 + Set<Entry<K, V>> entrySet();
134 +
135 + /**
136 + * If the specified key is not already associated with a value
137 + * associates it with the given value and returns null, else returns the current value.
138 + *
139 + * @param key key with which the specified value is to be associated
140 + * @param value value to be associated with the specified key
141 + * @return the previous value associated with the specified key or null
142 + * if key does not already mapped to a value.
143 + */
144 + V putIfAbsent(K key, V value);
145 +
146 + /**
147 + * Removes the entry for the specified key only if it is currently
148 + * mapped to the specified value.
149 + *
150 + * @param key key with which the specified value is associated
151 + * @param value value expected to be associated with the specified key
152 + * @return true if the value was removed
153 + */
154 + boolean remove(K key, V value);
155 +
156 + /**
157 + * Replaces the entry for the specified key only if currently mapped
158 + * to the specified value.
159 + *
160 + * @param key key with which the specified value is associated
161 + * @param oldValue value expected to be associated with the specified key
162 + * @param newValue value to be associated with the specified key
163 + * @return true if the value was replaced
164 + */
165 + boolean replace(K key, V oldValue, V newValue);
166 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.*; ...@@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.*;
20 20
21 import java.util.Collection; 21 import java.util.Collection;
22 import java.util.Collections; 22 import java.util.Collections;
23 -import java.util.List;
24 import java.util.Map; 23 import java.util.Map;
25 import java.util.Map.Entry; 24 import java.util.Map.Entry;
26 import java.util.concurrent.CompletableFuture; 25 import java.util.concurrent.CompletableFuture;
...@@ -35,7 +34,6 @@ import org.onlab.util.HexString; ...@@ -35,7 +34,6 @@ import org.onlab.util.HexString;
35 import org.onosproject.store.service.ConsistentMap; 34 import org.onosproject.store.service.ConsistentMap;
36 import org.onosproject.store.service.ConsistentMapException; 35 import org.onosproject.store.service.ConsistentMapException;
37 import org.onosproject.store.service.Serializer; 36 import org.onosproject.store.service.Serializer;
38 -import org.onosproject.store.service.UpdateOperation;
39 import org.onosproject.store.service.Versioned; 37 import org.onosproject.store.service.Versioned;
40 38
41 import com.google.common.cache.CacheBuilder; 39 import com.google.common.cache.CacheBuilder;
...@@ -73,7 +71,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> { ...@@ -73,7 +71,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
73 return serializer.decode(HexString.fromHexString(key)); 71 return serializer.decode(HexString.fromHexString(key));
74 } 72 }
75 73
76 - ConsistentMapImpl(String name, 74 + public ConsistentMapImpl(String name,
77 DatabaseProxy<String, byte[]> proxy, 75 DatabaseProxy<String, byte[]> proxy,
78 Serializer serializer) { 76 Serializer serializer) {
79 this.name = checkNotNull(name, "map name cannot be null"); 77 this.name = checkNotNull(name, "map name cannot be null");
...@@ -196,15 +194,6 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> { ...@@ -196,15 +194,6 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
196 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue))); 194 return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
197 } 195 }
198 196
199 - @Override
200 - public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
201 - checkNotNull(updates, "updates cannot be null");
202 - return complete(proxy.atomicBatchUpdate(updates
203 - .stream()
204 - .map(this::toRawUpdateOperation)
205 - .collect(Collectors.toList())));
206 - }
207 -
208 private static <T> T complete(CompletableFuture<T> future) { 197 private static <T> T complete(CompletableFuture<T> future) {
209 try { 198 try {
210 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); 199 return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
...@@ -225,31 +214,4 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> { ...@@ -225,31 +214,4 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
225 serializer.decode(e.getValue().value()), 214 serializer.decode(e.getValue().value()),
226 e.getValue().version())); 215 e.getValue().version()));
227 } 216 }
228 -
229 - private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
230 -
231 - checkArgument(name.equals(update.tableName()), "Unexpected table name");
232 -
233 - UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
234 -
235 - rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
236 - .withCurrentVersion(update.currentVersion())
237 - .withType(update.type());
238 -
239 - rawUpdate = rawUpdate.withTableName(update.tableName());
240 -
241 - if (update.value() != null) {
242 - rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
243 - } else {
244 - checkState(update.type() == UpdateOperation.Type.REMOVE
245 - || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
246 - ERROR_NULL_VALUE);
247 - }
248 -
249 - if (update.currentValue() != null) {
250 - rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
251 - }
252 -
253 - return rawUpdate.build();
254 - }
255 } 217 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -41,6 +41,7 @@ import org.onosproject.cluster.DefaultControllerNode; ...@@ -41,6 +41,7 @@ import org.onosproject.cluster.DefaultControllerNode;
41 import org.onosproject.store.service.ConsistentMap; 41 import org.onosproject.store.service.ConsistentMap;
42 import org.onosproject.store.service.Serializer; 42 import org.onosproject.store.service.Serializer;
43 import org.onosproject.store.service.StorageService; 43 import org.onosproject.store.service.StorageService;
44 +import org.onosproject.store.service.TransactionContext;
44 import org.slf4j.Logger; 45 import org.slf4j.Logger;
45 46
46 import com.google.common.collect.Sets; 47 import com.google.common.collect.Sets;
...@@ -154,4 +155,9 @@ public class DatabaseManager implements StorageService { ...@@ -154,4 +155,9 @@ public class DatabaseManager implements StorageService {
154 public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) { 155 public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
155 return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer); 156 return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
156 } 157 }
158 +
159 + @Override
160 + public TransactionContext createTransactionContext() {
161 + return new DefaultTransactionContext(partitionedDatabase);
162 + }
157 } 163 }
...\ No newline at end of file ...\ No newline at end of file
......
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.consistent.impl;
18 +
19 +import java.util.List;
20 +import java.util.Map;
21 +import java.util.concurrent.CompletableFuture;
22 +import java.util.concurrent.ExecutionException;
23 +import java.util.concurrent.TimeUnit;
24 +import java.util.concurrent.TimeoutException;
25 +
26 +import static com.google.common.base.Preconditions.*;
27 +
28 +import org.onosproject.store.service.ConsistentMap;
29 +import org.onosproject.store.service.Serializer;
30 +import org.onosproject.store.service.TransactionContext;
31 +import org.onosproject.store.service.TransactionException;
32 +import org.onosproject.store.service.TransactionalMap;
33 +import org.onosproject.store.service.UpdateOperation;
34 +
35 +import com.google.common.collect.Lists;
36 +import com.google.common.collect.Maps;
37 +
38 +/**
39 + * Default TransactionContext implementation.
40 + */
41 +public class DefaultTransactionContext implements TransactionContext {
42 +
43 + private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
44 + private boolean isOpen = false;
45 + DatabaseProxy<String, byte[]> databaseProxy;
46 + private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
47 + private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
48 +
49 + DefaultTransactionContext(DatabaseProxy<String, byte[]> proxy) {
50 + this.databaseProxy = proxy;
51 + }
52 +
53 + @Override
54 + public void begin() {
55 + isOpen = true;
56 + }
57 +
58 + @Override
59 + public <K, V> TransactionalMap<K, V> createTransactionalMap(String mapName,
60 + Serializer serializer) {
61 + checkNotNull(mapName, "map name is null");
62 + checkNotNull(serializer, "serializer is null");
63 + checkState(isOpen, TX_NOT_OPEN_ERROR);
64 + if (!txMaps.containsKey(mapName)) {
65 + ConsistentMap<K, V> backingMap = new ConsistentMapImpl<>(mapName, databaseProxy, serializer);
66 + DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
67 + txMaps.put(mapName, txMap);
68 + }
69 + return txMaps.get(mapName);
70 + }
71 +
72 + @Override
73 + public void commit() {
74 + checkState(isOpen, TX_NOT_OPEN_ERROR);
75 + List<UpdateOperation<String, byte[]>> allUpdates =
76 + Lists.newLinkedList();
77 + try {
78 + txMaps.values()
79 + .stream()
80 + .forEach(m -> {
81 + allUpdates.addAll(m.prepareDatabaseUpdates());
82 + });
83 +
84 + if (!complete(databaseProxy.atomicBatchUpdate(allUpdates))) {
85 + throw new TransactionException.OptimisticConcurrencyFailure();
86 + }
87 + } finally {
88 + isOpen = false;
89 + }
90 + }
91 +
92 + @Override
93 + public void rollback() {
94 + checkState(isOpen, TX_NOT_OPEN_ERROR);
95 + txMaps.values()
96 + .stream()
97 + .forEach(m -> m.rollback());
98 + }
99 +
100 + @Override
101 + public boolean isOpen() {
102 + return false;
103 + }
104 +
105 + private static <T> T complete(CompletableFuture<T> future) {
106 + try {
107 + return future.get(TRANSACTION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
108 + } catch (InterruptedException e) {
109 + Thread.currentThread().interrupt();
110 + throw new TransactionException.Interrupted();
111 + } catch (TimeoutException e) {
112 + throw new TransactionException.Timeout();
113 + } catch (ExecutionException e) {
114 + throw new TransactionException(e.getCause());
115 + }
116 + }
117 +}
1 +/*
2 + * Copyright 2015 Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +
17 +package org.onosproject.store.consistent.impl;
18 +
19 +import java.util.Collection;
20 +import java.util.List;
21 +import java.util.Map;
22 +import java.util.Map.Entry;
23 +import java.util.stream.Collectors;
24 +import java.util.Set;
25 +
26 +import org.onlab.util.HexString;
27 +import org.onosproject.store.service.ConsistentMap;
28 +import org.onosproject.store.service.Serializer;
29 +import org.onosproject.store.service.TransactionContext;
30 +import org.onosproject.store.service.TransactionalMap;
31 +import org.onosproject.store.service.UpdateOperation;
32 +import org.onosproject.store.service.Versioned;
33 +
34 +import static com.google.common.base.Preconditions.*;
35 +
36 +import com.google.common.collect.Lists;
37 +import com.google.common.collect.Maps;
38 +import com.google.common.collect.Sets;
39 +
40 +/**
41 + * Default Transactional Map implementation that provides a repeatable reads
42 + * transaction isolation level.
43 + *
44 + * @param <K> key type
45 + * @param <V> value type.
46 + */
47 +public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V> {
48 +
49 + private final TransactionContext txContext;
50 + private static final String TX_CLOSED_ERROR = "Transaction is closed";
51 + private final ConsistentMap<K, V> backingMap;
52 + private final String name;
53 + private final Serializer serializer;
54 + private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
55 + private final Map<K, V> writeCache = Maps.newConcurrentMap();
56 + private final Set<K> deleteSet = Sets.newConcurrentHashSet();
57 +
58 + public DefaultTransactionalMap(
59 + String name,
60 + ConsistentMap<K, V> backingMap,
61 + TransactionContext txContext,
62 + Serializer serializer) {
63 + this.name = name;
64 + this.backingMap = backingMap;
65 + this.txContext = txContext;
66 + this.serializer = serializer;
67 + }
68 +
69 + @Override
70 + public V get(K key) {
71 + checkState(txContext.isOpen(), TX_CLOSED_ERROR);
72 + if (deleteSet.contains(key)) {
73 + return null;
74 + } else if (writeCache.containsKey(key)) {
75 + return writeCache.get(key);
76 + } else {
77 + if (!readCache.containsKey(key)) {
78 + readCache.put(key, backingMap.get(key));
79 + }
80 + Versioned<V> v = readCache.get(key);
81 + return v != null ? v.value() : null;
82 + }
83 + }
84 +
85 + @Override
86 + public V put(K key, V value) {
87 + checkState(txContext.isOpen(), TX_CLOSED_ERROR);
88 + Versioned<V> original = readCache.get(key);
89 + V recentUpdate = writeCache.put(key, value);
90 + deleteSet.remove(key);
91 + return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
92 + }
93 +
94 + @Override
95 + public V remove(K key) {
96 + checkState(txContext.isOpen(), TX_CLOSED_ERROR);
97 + Versioned<V> original = readCache.get(key);
98 + V recentUpdate = writeCache.remove(key);
99 + deleteSet.add(key);
100 + return recentUpdate == null ? (original != null ? original.value() : null) : recentUpdate;
101 + }
102 +
103 + @Override
104 + public boolean remove(K key, V value) {
105 + V currentValue = get(key);
106 + if (value.equals(currentValue)) {
107 + remove(key);
108 + return true;
109 + }
110 + return false;
111 + }
112 +
113 + @Override
114 + public boolean replace(K key, V oldValue, V newValue) {
115 + V currentValue = get(key);
116 + if (oldValue.equals(currentValue)) {
117 + put(key, newValue);
118 + return true;
119 + }
120 + return false;
121 + }
122 +
123 + @Override
124 + public int size() {
125 + // TODO
126 + throw new UnsupportedOperationException();
127 + }
128 +
129 + @Override
130 + public boolean isEmpty() {
131 + return size() == 0;
132 + }
133 +
134 + @Override
135 + public boolean containsKey(K key) {
136 + return get(key) != null;
137 + }
138 +
139 + @Override
140 + public boolean containsValue(V value) {
141 + // TODO
142 + throw new UnsupportedOperationException();
143 + }
144 +
145 + @Override
146 + public void clear() {
147 + // TODO
148 + throw new UnsupportedOperationException();
149 + }
150 +
151 + @Override
152 + public Set<K> keySet() {
153 + // TODO
154 + throw new UnsupportedOperationException();
155 + }
156 +
157 + @Override
158 + public Collection<V> values() {
159 + // TODO
160 + throw new UnsupportedOperationException();
161 + }
162 +
163 + @Override
164 + public Set<Entry<K, V>> entrySet() {
165 + // TODO
166 + throw new UnsupportedOperationException();
167 + }
168 +
169 + @Override
170 + public V putIfAbsent(K key, V value) {
171 + V currentValue = get(key);
172 + if (currentValue == null) {
173 + put(key, value);
174 + return null;
175 + }
176 + return currentValue;
177 + }
178 +
179 + protected List<UpdateOperation<String, byte[]>> prepareDatabaseUpdates() {
180 + List<UpdateOperation<K, V>> updates = Lists.newLinkedList();
181 + deleteSet.forEach(key -> {
182 + Versioned<V> original = readCache.get(key);
183 + if (original != null) {
184 + updates.add(UpdateOperation.<K, V>newBuilder()
185 + .withTableName(name)
186 + .withType(UpdateOperation.Type.REMOVE_IF_VERSION_MATCH)
187 + .withKey(key)
188 + .withCurrentVersion(original.version())
189 + .build());
190 + }
191 + });
192 + writeCache.forEach((key, value) -> {
193 + Versioned<V> original = readCache.get(key);
194 + if (original == null) {
195 + updates.add(UpdateOperation.<K, V>newBuilder()
196 + .withTableName(name)
197 + .withType(UpdateOperation.Type.PUT_IF_ABSENT)
198 + .withKey(key)
199 + .withValue(value)
200 + .build());
201 + } else {
202 + updates.add(UpdateOperation.<K, V>newBuilder()
203 + .withTableName(name)
204 + .withType(UpdateOperation.Type.PUT_IF_VERSION_MATCH)
205 + .withKey(key)
206 + .withCurrentVersion(original.version())
207 + .withValue(value)
208 + .build());
209 + }
210 + });
211 + return updates.stream().map(this::toRawUpdateOperation).collect(Collectors.toList());
212 + }
213 +
214 + private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
215 +
216 + UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
217 +
218 + rawUpdate = rawUpdate.withKey(HexString.toHexString(serializer.encode(update.key())))
219 + .withCurrentVersion(update.currentVersion())
220 + .withType(update.type());
221 +
222 + rawUpdate = rawUpdate.withTableName(update.tableName());
223 +
224 + if (update.value() != null) {
225 + rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
226 + }
227 +
228 + if (update.currentValue() != null) {
229 + rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
230 + }
231 +
232 + return rawUpdate.build();
233 + }
234 +
235 + /**
236 + * Discards all changes made to this transactional map.
237 + */
238 + protected void rollback() {
239 + readCache.clear();
240 + writeCache.clear();
241 + deleteSet.clear();
242 + }
243 +}
...\ No newline at end of file ...\ No newline at end of file