Yuta HIGUCHI

FlowRule : handle Future failure and timeouts.

Change-Id: Ie945b7ee936ae48ec3205592c309baebe8538ce0
......@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -54,6 +55,7 @@ import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -389,15 +391,22 @@ public class FlowRuleManager
futureService.submit(new Runnable() {
@Override
public void run() {
CompletedBatchOperation res = null;
CompletedBatchOperation res;
try {
res = result.get(TIMEOUT, TimeUnit.MILLISECONDS);
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
} catch (TimeoutException | InterruptedException | ExecutionException e) {
log.warn("Something went wrong with the batch operation {}",
request.batchId());
request.batchId(), e);
Set<FlowRule> failures = new HashSet<>(batchOperation.size());
for (FlowRuleBatchEntry op : batchOperation.getOperations()) {
failures.add(op.getTarget());
}
res = new CompletedBatchOperation(false, failures);
store.batchOperationComplete(FlowRuleBatchEvent.completed(request, res));
}
}
});
break;
......
......@@ -85,6 +85,8 @@ import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
......@@ -132,8 +134,7 @@ public class DistributedFlowRuleStore
private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
CacheBuilder.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
// TODO Explicitly fail the future if expired?
//.removalListener(listener)
.removalListener(new TimeoutFuture())
.build();
// Cache of SMaps used for backup data. each SMap contain device flow table
......@@ -541,6 +542,17 @@ public class DistributedFlowRuleStore
log.debug("removedFromPrimary {}", removed);
}
private static final class TimeoutFuture
implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
@Override
public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
// wrapping in ExecutionException to support Future.get
notification.getValue()
.setException(new ExecutionException("Timed out",
new TimeoutException()));
}
}
private final class OnStoreBatch implements ClusterMessageHandler {
private final NodeId local;
......@@ -580,7 +592,18 @@ public class DistributedFlowRuleStore
@Override
public void run() {
CompletedBatchOperation result = Futures.getUnchecked(f);
CompletedBatchOperation result;
try {
result = f.get();
} catch (InterruptedException | ExecutionException e) {
log.error("Batch operation failed", e);
// create everything failed response
Set<FlowRule> failures = new HashSet<>(operation.size());
for (FlowRuleBatchEntry op : operation.getOperations()) {
failures.add(op.getTarget());
}
result = new CompletedBatchOperation(false, failures);
}
try {
message.respond(SERIALIZER.encode(result));
} catch (IOException e) {
......
......@@ -18,6 +18,8 @@ package org.onlab.onos.store.trivial.impl;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.FluentIterable;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
......@@ -53,8 +55,10 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
......@@ -86,8 +90,7 @@ public class SimpleFlowRuleStore
private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
CacheBuilder.newBuilder()
.expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
// TODO Explicitly fail the future if expired?
//.removalListener(listener)
.removalListener(new TimeoutFuture())
.build();
@Activate
......@@ -303,4 +306,15 @@ public class SimpleFlowRuleStore
}
notifyDelegate(event);
}
private static final class TimeoutFuture
implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
@Override
public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
// wrapping in ExecutionException to support Future.get
notification.getValue()
.setException(new ExecutionException("Timed out",
new TimeoutException()));
}
}
}
......