HIGUCHI Yuta
Committed by Gerrit Code Review

Attemp to improve hasPendingUpdates()

- Create update list using Stream, so that
  hasPendingUpdates() can short cut on first update found.
- Might improve performance when there is large number of updates in a Tx.

Change-Id: I20820b7212c642315a620d715b5c750e35d31dd0
...@@ -20,7 +20,9 @@ import java.util.List; ...@@ -20,7 +20,9 @@ import java.util.List;
20 import java.util.Map; 20 import java.util.Map;
21 import java.util.Set; 21 import java.util.Set;
22 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.CompletableFuture;
23 - 23 +import java.util.stream.Collectors;
24 +import java.util.stream.Stream;
25 +import org.apache.commons.lang3.tuple.Pair;
24 import org.onlab.util.HexString; 26 import org.onlab.util.HexString;
25 import org.onosproject.store.primitives.MapUpdate; 27 import org.onosproject.store.primitives.MapUpdate;
26 import org.onosproject.store.service.AsyncConsistentMap; 28 import org.onosproject.store.service.AsyncConsistentMap;
...@@ -189,41 +191,49 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, Tr ...@@ -189,41 +191,49 @@ public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, Tr
189 return updates().size(); 191 return updates().size();
190 } 192 }
191 193
192 - protected List<MapUpdate<K, V>> updates() { 194 + @Override
193 - List<MapUpdate<K, V>> updates = Lists.newLinkedList(); 195 + public boolean hasPendingUpdates() {
194 - deleteSet.forEach(key -> { 196 + return updatesStream().findAny().isPresent();
195 - Versioned<V> original = readCache.get(key);
196 - if (original != null) {
197 - updates.add(MapUpdate.<K, V>newBuilder()
198 - .withMapName(name)
199 - .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
200 - .withKey(key)
201 - .withCurrentVersion(original.version())
202 - .build());
203 - }
204 - });
205 - writeCache.forEach((key, value) -> {
206 - Versioned<V> original = readCache.get(key);
207 - if (original == null) {
208 - updates.add(MapUpdate.<K, V>newBuilder()
209 - .withMapName(name)
210 - .withType(MapUpdate.Type.PUT_IF_ABSENT)
211 - .withKey(key)
212 - .withValue(value)
213 - .build());
214 - } else {
215 - updates.add(MapUpdate.<K, V>newBuilder()
216 - .withMapName(name)
217 - .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
218 - .withKey(key)
219 - .withCurrentVersion(original.version())
220 - .withValue(value)
221 - .build());
222 - }
223 - });
224 - return updates;
225 } 197 }
226 198
199 + protected Stream<MapUpdate<K, V>> updatesStream() {
200 + return Stream.concat(
201 + // 1st stream: delete ops
202 + deleteSet.stream()
203 + .map(key -> Pair.of(key, readCache.get(key)))
204 + .filter(e -> e.getValue() != null)
205 + .map(e -> MapUpdate.<K, V>newBuilder()
206 + .withMapName(name)
207 + .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
208 + .withKey(e.getKey())
209 + .withCurrentVersion(e.getValue().version())
210 + .build()),
211 + // 2nd stream: write ops
212 + writeCache.entrySet().stream()
213 + .map(e -> {
214 + Versioned<V> original = readCache.get(e.getKey());
215 + if (original == null) {
216 + return MapUpdate.<K, V>newBuilder()
217 + .withMapName(name)
218 + .withType(MapUpdate.Type.PUT_IF_ABSENT)
219 + .withKey(e.getKey())
220 + .withValue(e.getValue())
221 + .build();
222 + } else {
223 + return MapUpdate.<K, V>newBuilder()
224 + .withMapName(name)
225 + .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
226 + .withKey(e.getKey())
227 + .withCurrentVersion(original.version())
228 + .withValue(e.getValue())
229 + .build();
230 + }
231 + }));
232 + }
233 +
234 + protected List<MapUpdate<K, V>> updates() {
235 + return updatesStream().collect(Collectors.toList());
236 + }
227 237
228 protected List<MapUpdate<String, byte[]>> toMapUpdates() { 238 protected List<MapUpdate<String, byte[]>> toMapUpdates() {
229 List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList(); 239 List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
......