Committed by
Gerrit Code Review
Refactor: remove an extra inner class, IntentBatchProcess
IntentBatchProcess wraps a collection of IntentData to just behave as Runnable. It can be removed by lambda expression Change-Id: Ic6e1ae840311faff6314a8ce9db9a5277d8595cd
Showing
1 changed file
with
40 additions
and
52 deletions
... | @@ -301,73 +301,61 @@ public class IntentManager | ... | @@ -301,73 +301,61 @@ public class IntentManager |
301 | }); | 301 | }); |
302 | } | 302 | } |
303 | 303 | ||
304 | - private class IntentBatchProcess implements Runnable { | 304 | + private class InternalBatchDelegate implements IntentBatchDelegate { |
305 | - | ||
306 | - protected final Collection<IntentData> data; | ||
307 | - | ||
308 | - IntentBatchProcess(Collection<IntentData> data) { | ||
309 | - this.data = checkNotNull(data); | ||
310 | - } | ||
311 | - | ||
312 | @Override | 305 | @Override |
313 | - public void run() { | 306 | + public void execute(Collection<IntentData> operations) { |
314 | - try { | 307 | + log.debug("Execute {} operation(s).", operations.size()); |
308 | + log.trace("Execute operations: {}", operations); | ||
309 | + | ||
310 | + // batchExecutor is single-threaded, so only one batch is in flight at a time | ||
311 | + batchExecutor.execute(() -> { | ||
312 | + try { | ||
315 | /* | 313 | /* |
316 | 1. wrap each intentdata in a runnable and submit | 314 | 1. wrap each intentdata in a runnable and submit |
317 | 2. wait for completion of all the work | 315 | 2. wait for completion of all the work |
318 | 3. accumulate results and submit batch write of IntentData to store | 316 | 3. accumulate results and submit batch write of IntentData to store |
319 | (we can also try to update these individually) | 317 | (we can also try to update these individually) |
320 | */ | 318 | */ |
321 | - submitUpdates(waitForFutures(createIntentUpdates())); | 319 | + submitUpdates(waitForFutures(createIntentUpdates(operations))); |
322 | - } catch (Exception e) { | 320 | + } catch (Exception e) { |
323 | - log.error("Error submitting batches:", e); | 321 | + log.error("Error submitting batches:", e); |
324 | - // FIXME incomplete Intents should be cleaned up | 322 | + // FIXME incomplete Intents should be cleaned up |
325 | - // (transition to FAILED, etc.) | 323 | + // (transition to FAILED, etc.) |
326 | - | 324 | + |
327 | - // the batch has failed | 325 | + // the batch has failed |
328 | - // TODO: maybe we should do more? | 326 | + // TODO: maybe we should do more? |
329 | - log.error("Walk the plank, matey..."); | 327 | + log.error("Walk the plank, matey..."); |
330 | - //FIXME | 328 | + //FIXME |
331 | // batchService.removeIntentOperations(data); | 329 | // batchService.removeIntentOperations(data); |
332 | - } | 330 | + } |
333 | - accumulator.ready(); | 331 | + accumulator.ready(); |
332 | + }); | ||
334 | } | 333 | } |
334 | + } | ||
335 | 335 | ||
336 | - private List<Future<FinalIntentProcessPhase>> createIntentUpdates() { | 336 | + private List<Future<FinalIntentProcessPhase>> createIntentUpdates(Collection<IntentData> data) { |
337 | - return data.stream() | 337 | + return data.stream() |
338 | - .map(IntentManager.this::submitIntentData) | 338 | + .map(IntentManager.this::submitIntentData) |
339 | - .collect(Collectors.toList()); | 339 | + .collect(Collectors.toList()); |
340 | - } | 340 | + } |
341 | 341 | ||
342 | - private List<FinalIntentProcessPhase> waitForFutures(List<Future<FinalIntentProcessPhase>> futures) { | 342 | + private List<FinalIntentProcessPhase> waitForFutures(List<Future<FinalIntentProcessPhase>> futures) { |
343 | - ImmutableList.Builder<FinalIntentProcessPhase> updateBuilder = ImmutableList.builder(); | 343 | + ImmutableList.Builder<FinalIntentProcessPhase> updateBuilder = ImmutableList.builder(); |
344 | - for (Future<FinalIntentProcessPhase> future : futures) { | 344 | + for (Future<FinalIntentProcessPhase> future : futures) { |
345 | - try { | 345 | + try { |
346 | - updateBuilder.add(future.get()); | 346 | + updateBuilder.add(future.get()); |
347 | - } catch (InterruptedException | ExecutionException e) { | 347 | + } catch (InterruptedException | ExecutionException e) { |
348 | - //FIXME | 348 | + //FIXME |
349 | - log.warn("Future failed: {}", e); | 349 | + log.warn("Future failed: {}", e); |
350 | - } | ||
351 | } | 350 | } |
352 | - return updateBuilder.build(); | ||
353 | - } | ||
354 | - | ||
355 | - private void submitUpdates(List<FinalIntentProcessPhase> updates) { | ||
356 | - store.batchWrite(updates.stream() | ||
357 | - .map(FinalIntentProcessPhase::data) | ||
358 | - .collect(Collectors.toList())); | ||
359 | } | 351 | } |
352 | + return updateBuilder.build(); | ||
360 | } | 353 | } |
361 | 354 | ||
362 | - private class InternalBatchDelegate implements IntentBatchDelegate { | 355 | + private void submitUpdates(List<FinalIntentProcessPhase> updates) { |
363 | - @Override | 356 | + store.batchWrite(updates.stream() |
364 | - public void execute(Collection<IntentData> operations) { | 357 | + .map(FinalIntentProcessPhase::data) |
365 | - log.debug("Execute {} operation(s).", operations.size()); | 358 | + .collect(Collectors.toList())); |
366 | - log.trace("Execute operations: {}", operations); | ||
367 | - | ||
368 | - // batchExecutor is single-threaded, so only one batch is in flight at a time | ||
369 | - batchExecutor.execute(new IntentBatchProcess(operations)); | ||
370 | - } | ||
371 | } | 359 | } |
372 | 360 | ||
373 | private class InternalIntentProcessor implements IntentProcessor { | 361 | private class InternalIntentProcessor implements IntentProcessor { | ... | ... |
-
Please register or login to post a comment