Jian Li
Committed by Gerrit Code Review

Add SharedScheduledExecutors for ease of using ScheduledExecutors

- LogScheduledExecutorService -> SharedScheduledExecutorService
- Add a utility classs for SharedScheduledExecutorService
- Add unit test for SharedScheduledExecutors
- Revise the control message provider to use
  SharedScheduledExecutorService

Change-Id: Ia4dea245543b4751e6edcce1aaab4991d897cc77
......@@ -22,6 +22,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.metrics.MetricsService;
import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.cpman.message.ControlMessageProvider;
import org.onosproject.cpman.message.ControlMessageProviderRegistry;
import org.onosproject.cpman.message.ControlMessageProviderService;
......@@ -43,9 +44,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.loggableScheduledExecutor;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.openflow.controller.Dpid.uri;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -106,9 +104,7 @@ public class OpenFlowControlMessageProvider extends AbstractProvider
// listens all OpenFlow outgoing message events
controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
executor = loggableScheduledExecutor(
newSingleThreadScheduledExecutor(groupedThreads("onos/provider",
"aggregator")));
executor = SharedScheduledExecutors.getSingleThreadExecutor();
connectInitialDevices();
log.info("Started");
......
......@@ -16,6 +16,7 @@
package org.onlab.util;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
import java.util.Collection;
......@@ -33,7 +34,7 @@ import static org.slf4j.LoggerFactory.getLogger;
/**
* A new scheduled executor service that does not eat exception.
*/
class LogScheduledExecutorService implements ScheduledExecutorService {
class SharedScheduledExecutorService implements ScheduledExecutorService {
private static final String NOT_ALLOWED = "Shutdown of scheduled executor is not allowed";
private final Logger log = getLogger(getClass());
......@@ -45,7 +46,7 @@ class LogScheduledExecutorService implements ScheduledExecutorService {
*
* @param executor executor service to wrap
*/
LogScheduledExecutorService(ScheduledExecutorService executor) {
SharedScheduledExecutorService(ScheduledExecutorService executor) {
this.executor = executor;
}
......@@ -191,7 +192,7 @@ class LogScheduledExecutorService implements ScheduledExecutorService {
runnable.run();
} catch (Exception e) {
log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
throw new RuntimeException(e);
throw Throwables.propagate(e);
}
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import java.util.concurrent.ScheduledExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
/**
* Utility for managing a set of shared execution resources, such as a single
* thread scheduled executor and thread pool scheduled executor for use by
* various parts of the platform or by applications.
* <p>
* Whenever possible, use of these shared resources is encouraged over creating
* separate ones.
* </p>
*/
public final class SharedScheduledExecutors {
public static final int DEFAULT_POOL_SIZE = 30;
private static SharedScheduledExecutorService singleThreadExecutor =
new SharedScheduledExecutorService(
newSingleThreadScheduledExecutor(
groupedThreads("onos/shared/scheduled",
"onos-single-executor")));
private static SharedScheduledExecutorService poolThreadExecutor =
new SharedScheduledExecutorService(
newScheduledThreadPool(DEFAULT_POOL_SIZE,
groupedThreads("onos/shared/scheduled",
"onos-pool-executor-%d")));
// Ban public construction
private SharedScheduledExecutors() {
}
/**
* Returns the shared scheduled single thread executor.
*
* @return shared scheduled single thread executor
*/
public static ScheduledExecutorService getSingleThreadExecutor() {
return singleThreadExecutor;
}
/**
* Returns the shared scheduled thread pool executor.
*
* @return shared scheduled executor pool
*/
public static ScheduledExecutorService getPoolThreadExecutor() {
return poolThreadExecutor;
}
/**
* Configures the shared scheduled thread pool size.
*
* @param poolSize new pool size
*/
public static void setPoolSize(int poolSize) {
checkArgument(poolSize > 0, "Shared pool size size must be greater than 0");
poolThreadExecutor.setBackingExecutor(
newScheduledThreadPool(poolSize, groupedThreads("onos/shared/scheduled",
"onos-pool-executor-%d")));
}
/**
* Shuts down all shared scheduled executors.
* This is not intended to be called by application directly.
*/
public static void shutdown() {
singleThreadExecutor.backingExecutor().shutdown();
poolThreadExecutor.backingExecutor().shutdown();
}
}
......@@ -15,10 +15,11 @@
*/
package org.onlab.util;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.walkFileTree;
import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import java.io.BufferedReader;
import java.io.File;
......@@ -45,7 +46,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -55,12 +55,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.slf4j.Logger;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.primitives.UnsignedLongs;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static java.nio.file.Files.delete;
import static java.nio.file.Files.walkFileTree;
import static org.onlab.util.GroupedThreadFactory.groupedThreadFactory;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Miscellaneous utility methods.
......@@ -130,17 +128,6 @@ public abstract class Tools {
}
/**
* Returns a loggable scheduled executor service that allows to capture and
* log any exceptions if the scheduled tasks are failed during execution.
*
* @param executor scheduled executor service
* @return loggable scheduled executor service
*/
public static ScheduledExecutorService loggableScheduledExecutor(ScheduledExecutorService executor) {
return new LogScheduledExecutorService(executor);
}
/**
* Returns a thread factory that produces threads with MIN_PRIORITY.
*
* @param factory backing ThreadFactory
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import org.junit.Test;
import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
/**
* Tests of the SharedScheduledExecutors.
*/
public class SharedScheduledExecutorsTest {
@Test
public void singleThread() {
ScheduledExecutorService a = SharedScheduledExecutors.getSingleThreadExecutor();
assertNotNull("ScheduledExecutorService must not be null", a);
ScheduledExecutorService b = SharedScheduledExecutors.getSingleThreadExecutor();
assertSame("factories should be same", a, b);
}
@Test
public void poolThread() {
ScheduledExecutorService a = SharedScheduledExecutors.getPoolThreadExecutor();
assertNotNull("ScheduledExecutorService must not be null", a);
ScheduledExecutorService b = SharedScheduledExecutors.getPoolThreadExecutor();
assertSame("factories should be same", a, b);
}
}