Jian Li
Committed by Gerrit Code Review

A new ScheduledExecutorService that captures and logs exception

The default ScheduledExecutorService does not provide the
capability to capture and log the exception during executing
scheduleAtFixedRate and scheduleWithFixedDelay methods. This
makes it difficult to debug the program when the scheudled
tasks are failed for some reasons.
A new ScheduledExecutorService allows the developers to capture
and log any exceptions if the tasks are failed during execution.

Change-Id: I549ba0f479b9e302f0e668482873b3032dfea147
......@@ -39,15 +39,16 @@ import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.loggableScheduledExecutor;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.openflow.controller.Dpid.uri;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.util.Tools.groupedThreads;
/**
* Provider which uses an OpenFlow controller to collect control message.
......@@ -105,8 +106,9 @@ public class OpenFlowControlMessageProvider extends AbstractProvider
// listens all OpenFlow outgoing message events
controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
executor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/provider", "aggregator"));
executor = loggableScheduledExecutor(
newSingleThreadScheduledExecutor(groupedThreads("onos/provider",
"aggregator")));
connectInitialDevices();
log.info("Started");
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A new scheduled executor service that does not eat exception.
*/
class LogScheduledExecutorService implements ScheduledExecutorService {
private static final String NOT_ALLOWED = "Shutdown of scheduled executor is not allowed";
private final Logger log = getLogger(getClass());
private ScheduledExecutorService executor;
/**
* Creates a wrapper for the given scheduled executor service.
*
* @param executor executor service to wrap
*/
LogScheduledExecutorService(ScheduledExecutorService executor) {
this.executor = executor;
}
/**
* Returns the backing scheduled executor service.
*
* @return backing executor service
*/
ScheduledExecutorService backingExecutor() {
return executor;
}
/**
* Swaps the backing executor with a new one and shuts down the old one.
*
* @param executor new scheduled executor service
*/
void setBackingExecutor(ScheduledExecutorService executor) {
ScheduledExecutorService oldExecutor = this.executor;
this.executor = executor;
oldExecutor.shutdown();
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return executor.schedule(wrap(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return executor.schedule(() -> {
V v = null;
try {
v = callable.call();
} catch (Exception e) {
log.error("Uncaught exception on " + callable.getClass(), e);
}
return v;
}, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
long period, TimeUnit unit) {
return executor.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
long delay, TimeUnit unit) {
return executor.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
}
@Override
public void shutdown() {
throw new UnsupportedOperationException(NOT_ALLOWED);
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException(NOT_ALLOWED);
}
@Override
public boolean isShutdown() {
return executor.isShutdown();
}
@Override
public boolean isTerminated() {
return executor.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return executor.submit(task, result);
}
@Override
public Future<?> submit(Runnable task) {
return executor.submit(task);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return executor.invokeAll(tasks);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return executor.invokeAll(tasks, timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return executor.invokeAny(tasks);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return executor.invokeAny(tasks, timeout, unit);
}
@Override
public void execute(Runnable command) {
executor.execute(command);
}
private Runnable wrap(Runnable command) {
return new LoggableRunnable(command);
}
/**
* A runnable class that allows to capture and log the exceptions.
*/
private class LoggableRunnable implements Runnable {
private Runnable runnable;
public LoggableRunnable(Runnable runnable) {
super();
this.runnable = runnable;
}
@Override
public void run() {
try {
runnable.run();
} catch (Exception e) {
log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
throw new RuntimeException(e);
}
}
}
}
......@@ -45,6 +45,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -129,6 +130,17 @@ public abstract class Tools {
}
/**
* Returns a loggable scheduled executor service that allows to capture and
* log any exceptions if the scheduled tasks are failed during execution.
*
* @param executor scheduled executor service
* @return loggable scheduled executor service
*/
public static ScheduledExecutorService loggableScheduledExecutor(ScheduledExecutorService executor) {
return new LogScheduledExecutorService(executor);
}
/**
* Returns a thread factory that produces threads with MIN_PRIORITY.
*
* @param factory backing ThreadFactory
......