Sho SHIMIZU
Committed by Gerrit Code Review

Refactor: Inline invocations and remove methods

Change-Id: I004cab89f9d65cf9acc6721ef5865a8cb66dc61d
...@@ -54,7 +54,6 @@ import java.util.Optional; ...@@ -54,7 +54,6 @@ import java.util.Optional;
54 import java.util.concurrent.CompletableFuture; 54 import java.util.concurrent.CompletableFuture;
55 import java.util.concurrent.ExecutorService; 55 import java.util.concurrent.ExecutorService;
56 import java.util.stream.Collectors; 56 import java.util.stream.Collectors;
57 -import java.util.stream.Stream;
58 57
59 import static com.google.common.base.Preconditions.checkNotNull; 58 import static com.google.common.base.Preconditions.checkNotNull;
60 import static java.util.concurrent.Executors.newFixedThreadPool; 59 import static java.util.concurrent.Executors.newFixedThreadPool;
...@@ -323,7 +322,17 @@ public class IntentManager ...@@ -323,7 +322,17 @@ public class IntentManager
323 3. accumulate results and submit batch write of IntentData to store 322 3. accumulate results and submit batch write of IntentData to store
324 (we can also try to update these individually) 323 (we can also try to update these individually)
325 */ 324 */
326 - submitUpdates(waitForFutures(createIntentUpdates(operations))); 325 + store.batchWrite(operations.stream()
326 + .map(IntentManager.this::submitIntentData)
327 + .map(x -> x.exceptionally(e -> {
328 + //FIXME
329 + log.warn("Future failed: {}", e);
330 + return null;
331 + }))
332 + .map(CompletableFuture::join)
333 + .filter(Objects::nonNull)
334 + .map(FinalIntentProcessPhase::data)
335 + .collect(Collectors.toList()));
327 } catch (Exception e) { 336 } catch (Exception e) {
328 log.error("Error submitting batches:", e); 337 log.error("Error submitting batches:", e);
329 // FIXME incomplete Intents should be cleaned up 338 // FIXME incomplete Intents should be cleaned up
...@@ -340,28 +349,6 @@ public class IntentManager ...@@ -340,28 +349,6 @@ public class IntentManager
340 } 349 }
341 } 350 }
342 351
343 - private Stream<CompletableFuture<FinalIntentProcessPhase>> createIntentUpdates(Collection<IntentData> data) {
344 - return data.stream()
345 - .map(IntentManager.this::submitIntentData);
346 - }
347 -
348 - private Stream<FinalIntentProcessPhase> waitForFutures(Stream<CompletableFuture<FinalIntentProcessPhase>> futures) {
349 - return futures
350 - .map(x -> x.exceptionally(e -> {
351 - //FIXME
352 - log.warn("Future failed: {}", e);
353 - return null;
354 - }))
355 - .map(CompletableFuture::join)
356 - .filter(Objects::nonNull);
357 - }
358 -
359 - private void submitUpdates(Stream<FinalIntentProcessPhase> updates) {
360 - store.batchWrite(updates
361 - .map(FinalIntentProcessPhase::data)
362 - .collect(Collectors.toList()));
363 - }
364 -
365 private class InternalIntentProcessor implements IntentProcessor { 352 private class InternalIntentProcessor implements IntentProcessor {
366 @Override 353 @Override
367 public List<Intent> compile(Intent intent, List<Intent> previousInstallables) { 354 public List<Intent> compile(Intent intent, List<Intent> previousInstallables) {
......