Yuta HIGUCHI

DistributedFlowRuleStore

- replace pendingFuture Map with Cache
- remove the future after setting the Future value

Change-Id: I152dfd586350c472dde74a28579536b44761680a
...@@ -10,7 +10,6 @@ import java.util.ArrayList; ...@@ -10,7 +10,6 @@ import java.util.ArrayList;
10 import java.util.Arrays; 10 import java.util.Arrays;
11 import java.util.Collection; 11 import java.util.Collection;
12 import java.util.Collections; 12 import java.util.Collections;
13 -import java.util.Map;
14 import java.util.Set; 13 import java.util.Set;
15 import java.util.concurrent.ExecutionException; 14 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService; 15 import java.util.concurrent.ExecutorService;
...@@ -58,10 +57,11 @@ import org.onlab.util.KryoNamespace; ...@@ -58,10 +57,11 @@ import org.onlab.util.KryoNamespace;
58 import org.slf4j.Logger; 57 import org.slf4j.Logger;
59 58
60 import com.google.common.base.Function; 59 import com.google.common.base.Function;
60 +import com.google.common.cache.Cache;
61 +import com.google.common.cache.CacheBuilder;
61 import com.google.common.collect.ArrayListMultimap; 62 import com.google.common.collect.ArrayListMultimap;
62 import com.google.common.collect.ImmutableSet; 63 import com.google.common.collect.ImmutableSet;
63 import com.google.common.collect.Iterables; 64 import com.google.common.collect.Iterables;
64 -import com.google.common.collect.Maps;
65 import com.google.common.collect.Multimap; 65 import com.google.common.collect.Multimap;
66 import com.google.common.util.concurrent.Futures; 66 import com.google.common.util.concurrent.Futures;
67 import com.google.common.util.concurrent.ListenableFuture; 67 import com.google.common.util.concurrent.ListenableFuture;
...@@ -99,9 +99,16 @@ public class DistributedFlowRuleStore ...@@ -99,9 +99,16 @@ public class DistributedFlowRuleStore
99 99
100 private final AtomicInteger localBatchIdGen = new AtomicInteger(); 100 private final AtomicInteger localBatchIdGen = new AtomicInteger();
101 101
102 + // TODO: make this configurable
103 + private int pendingFutureTimeoutMinutes = 5;
104 +
105 + private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
106 + CacheBuilder.newBuilder()
107 + .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
108 + // TODO Explicitly fail the future if expired?
109 + //.removalListener(listener)
110 + .build();
102 111
103 - // FIXME switch to expiraing map/Cache?
104 - private Map<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures = Maps.newConcurrentMap();
105 112
106 private final ExecutorService futureListeners = 113 private final ExecutorService futureListeners =
107 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders")); 114 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
...@@ -405,10 +412,12 @@ public class DistributedFlowRuleStore ...@@ -405,10 +412,12 @@ public class DistributedFlowRuleStore
405 412
406 @Override 413 @Override
407 public void batchOperationComplete(FlowRuleBatchEvent event) { 414 public void batchOperationComplete(FlowRuleBatchEvent event) {
415 + final Integer batchId = event.subject().batchId();
408 SettableFuture<CompletedBatchOperation> future 416 SettableFuture<CompletedBatchOperation> future
409 - = pendingFutures.get(event.subject().batchId()); 417 + = pendingFutures.getIfPresent(batchId);
410 if (future != null) { 418 if (future != null) {
411 future.set(event.result()); 419 future.set(event.result());
420 + pendingFutures.invalidate(batchId);
412 } 421 }
413 notifyDelegate(event); 422 notifyDelegate(event);
414 } 423 }
......