Madan Jampani
Committed by Gerrit Code Review

Utilities for composing future results

Change-Id: Ie2ecfdedb69638fe7131879caa3b3708c4746006
...@@ -25,13 +25,12 @@ import java.util.Set; ...@@ -25,13 +25,12 @@ import java.util.Set;
25 import java.util.TreeMap; 25 import java.util.TreeMap;
26 import java.util.concurrent.CompletableFuture; 26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.Executor; 27 import java.util.concurrent.Executor;
28 -import java.util.concurrent.atomic.AtomicBoolean;
29 -import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.BiFunction; 28 import java.util.function.BiFunction;
31 import java.util.function.Consumer; 29 import java.util.function.Consumer;
32 import java.util.function.Predicate; 30 import java.util.function.Predicate;
33 import java.util.stream.Collectors; 31 import java.util.stream.Collectors;
34 32
33 +import org.onlab.util.Match;
35 import org.onlab.util.Tools; 34 import org.onlab.util.Tools;
36 import org.onosproject.cluster.PartitionId; 35 import org.onosproject.cluster.PartitionId;
37 import org.onosproject.store.primitives.MapUpdate; 36 import org.onosproject.store.primitives.MapUpdate;
...@@ -41,9 +40,10 @@ import org.onosproject.store.service.MapEventListener; ...@@ -41,9 +40,10 @@ import org.onosproject.store.service.MapEventListener;
41 import org.onosproject.store.service.MapTransaction; 40 import org.onosproject.store.service.MapTransaction;
42 import org.onosproject.store.service.Versioned; 41 import org.onosproject.store.service.Versioned;
43 42
43 +import com.google.common.collect.ImmutableList;
44 +import com.google.common.collect.ImmutableSet;
44 import com.google.common.collect.Lists; 45 import com.google.common.collect.Lists;
45 import com.google.common.collect.Maps; 46 import com.google.common.collect.Maps;
46 -import com.google.common.collect.Sets;
47 47
48 /** 48 /**
49 * {@link AsyncConsistentMap} that has its entries partitioned horizontally across 49 * {@link AsyncConsistentMap} that has its entries partitioned horizontally across
...@@ -73,12 +73,9 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K ...@@ -73,12 +73,9 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
73 73
74 @Override 74 @Override
75 public CompletableFuture<Integer> size() { 75 public CompletableFuture<Integer> size() {
76 - AtomicInteger totalSize = new AtomicInteger(0); 76 + return Tools.allOf(getMaps().stream().map(m -> m.size()).collect(Collectors.toList()),
77 - return CompletableFuture.allOf(getMaps() 77 + Math::addExact,
78 - .stream() 78 + 0);
79 - .map(map -> map.size().thenAccept(totalSize::addAndGet))
80 - .toArray(CompletableFuture[]::new))
81 - .thenApply(v -> totalSize.get());
82 } 79 }
83 80
84 @Override 81 @Override
...@@ -93,12 +90,9 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K ...@@ -93,12 +90,9 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
93 90
94 @Override 91 @Override
95 public CompletableFuture<Boolean> containsValue(V value) { 92 public CompletableFuture<Boolean> containsValue(V value) {
96 - AtomicBoolean contains = new AtomicBoolean(false); 93 + return Tools.firstOf(getMaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
97 - return CompletableFuture.allOf(getMaps().stream() 94 + Match.ifValue(true),
98 - .map(map -> map.containsValue(value) 95 + false);
99 - .thenAccept(v -> contains.set(contains.get() || v)))
100 - .toArray(CompletableFuture[]::new))
101 - .thenApply(v -> contains.get());
102 } 96 }
103 @Override 97 @Override
104 public CompletableFuture<Versioned<V>> get(K key) { 98 public CompletableFuture<Versioned<V>> get(K key) {
...@@ -136,29 +130,23 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K ...@@ -136,29 +130,23 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
136 130
137 @Override 131 @Override
138 public CompletableFuture<Set<K>> keySet() { 132 public CompletableFuture<Set<K>> keySet() {
139 - Set<K> allKeys = Sets.newConcurrentHashSet(); 133 + return Tools.allOf(getMaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
140 - return CompletableFuture.allOf(getMaps().stream() 134 + (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
141 - .map(map -> map.keySet().thenAccept(allKeys::addAll)) 135 + ImmutableSet.of());
142 - .toArray(CompletableFuture[]::new))
143 - .thenApply(v -> allKeys);
144 } 136 }
145 137
146 @Override 138 @Override
147 public CompletableFuture<Collection<Versioned<V>>> values() { 139 public CompletableFuture<Collection<Versioned<V>>> values() {
148 - List<Versioned<V>> allValues = Lists.newCopyOnWriteArrayList(); 140 + return Tools.allOf(getMaps().stream().map(m -> m.values()).collect(Collectors.toList()),
149 - return CompletableFuture.allOf(getMaps().stream() 141 + (c1, c2) -> ImmutableList.<Versioned<V>>builder().addAll(c1).addAll(c2).build(),
150 - .map(map -> map.values().thenAccept(allValues::addAll)) 142 + ImmutableList.of());
151 - .toArray(CompletableFuture[]::new))
152 - .thenApply(v -> allValues);
153 } 143 }
154 144
155 @Override 145 @Override
156 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() { 146 public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
157 - Set<Entry<K, Versioned<V>>> allEntries = Sets.newConcurrentHashSet(); 147 + return Tools.allOf(getMaps().stream().map(m -> m.entrySet()).collect(Collectors.toList()),
158 - return CompletableFuture.allOf(getMaps().stream() 148 + (s1, s2) -> ImmutableSet.<Entry<K, Versioned<V>>>builder().addAll(s1).addAll(s2).build(),
159 - .map(map -> map.entrySet().thenAccept(allEntries::addAll)) 149 + ImmutableSet.of());
160 - .toArray(CompletableFuture[]::new))
161 - .thenApply(v -> allEntries);
162 } 150 }
163 151
164 @Override 152 @Override
......
...@@ -15,11 +15,10 @@ ...@@ -15,11 +15,10 @@
15 */ 15 */
16 package org.onlab.util; 16 package org.onlab.util;
17 17
18 -import com.google.common.base.Charsets; 18 +import static java.nio.file.Files.delete;
19 -import com.google.common.base.Strings; 19 +import static java.nio.file.Files.walkFileTree;
20 -import com.google.common.primitives.UnsignedLongs; 20 +import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
21 -import com.google.common.util.concurrent.ThreadFactoryBuilder; 21 +import static org.slf4j.LoggerFactory.getLogger;
22 -import org.slf4j.Logger;
23 22
24 import java.io.File; 23 import java.io.File;
25 import java.io.IOException; 24 import java.io.IOException;
...@@ -44,16 +43,20 @@ import java.util.concurrent.Future; ...@@ -44,16 +43,20 @@ import java.util.concurrent.Future;
44 import java.util.concurrent.ThreadFactory; 43 import java.util.concurrent.ThreadFactory;
45 import java.util.concurrent.TimeUnit; 44 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException; 45 import java.util.concurrent.TimeoutException;
46 +import java.util.function.BinaryOperator;
47 import java.util.function.Function; 47 import java.util.function.Function;
48 import java.util.function.Supplier; 48 import java.util.function.Supplier;
49 import java.util.stream.Collectors; 49 import java.util.stream.Collectors;
50 import java.util.stream.Stream; 50 import java.util.stream.Stream;
51 import java.util.stream.StreamSupport; 51 import java.util.stream.StreamSupport;
52 52
53 -import static java.nio.file.Files.delete; 53 +import org.slf4j.Logger;
54 -import static java.nio.file.Files.walkFileTree; 54 +
55 -import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory; 55 +import com.google.common.base.Charsets;
56 -import static org.slf4j.LoggerFactory.getLogger; 56 +import com.google.common.base.Strings;
57 +import com.google.common.collect.Lists;
58 +import com.google.common.primitives.UnsignedLongs;
59 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
57 60
58 /** 61 /**
59 * Miscellaneous utility methods. 62 * Miscellaneous utility methods.
...@@ -622,6 +625,53 @@ public abstract class Tools { ...@@ -622,6 +625,53 @@ public abstract class Tools {
622 } 625 }
623 626
624 /** 627 /**
628 + * Returns a new CompletableFuture completed by reducing a list of computed values
629 + * when all of the given CompletableFuture complete.
630 + *
631 + * @param futures the CompletableFutures
632 + * @param reducer reducer for computing the result
633 + * @param emptyValue zero value to be returned if the input future list is empty
634 + * @param <T> value type of CompletableFuture
635 + * @return a new CompletableFuture that is completed when all of the given CompletableFutures complete
636 + */
637 + public static <T> CompletableFuture<T> allOf(List<CompletableFuture<T>> futures,
638 + BinaryOperator<T> reducer,
639 + T emptyValue) {
640 + return Tools.allOf(futures)
641 + .thenApply(resultList -> resultList.stream().reduce(reducer).orElse(emptyValue));
642 + }
643 +
644 + /**
645 + * Returns a new CompletableFuture completed by with the first positive result from a list of
646 + * input CompletableFutures.
647 + *
648 + * @param futures the input list of CompletableFutures
649 + * @param positiveResultMatcher matcher to identify a positive result
650 + * @param negativeResult value to complete with if none of the futures complete with a positive result
651 + * @param <T> value type of CompletableFuture
652 + * @return a new CompletableFuture
653 + */
654 + public static <T> CompletableFuture<T> firstOf(List<CompletableFuture<T>> futures,
655 + Match<T> positiveResultMatcher,
656 + T negativeResult) {
657 + CompletableFuture<T> responseFuture = new CompletableFuture<>();
658 + Tools.allOf(Lists.transform(futures, future -> future.thenAccept(r -> {
659 + if (positiveResultMatcher.matches(r)) {
660 + responseFuture.complete(r);
661 + }
662 + }))).whenComplete((r, e) -> {
663 + if (!responseFuture.isDone()) {
664 + if (e != null) {
665 + responseFuture.completeExceptionally(e);
666 + } else {
667 + responseFuture.complete(negativeResult);
668 + }
669 + }
670 + });
671 + return responseFuture;
672 + }
673 +
674 + /**
625 * Returns the contents of {@code ByteBuffer} as byte array. 675 * Returns the contents of {@code ByteBuffer} as byte array.
626 * <p> 676 * <p>
627 * WARNING: There is a performance cost due to array copy 677 * WARNING: There is a performance cost due to array copy
......