Jian Li
Committed by Gerrit Code Review

[ONOS-4142] Restart failed tasks when using SharedScheduledExecutor

With current SharedScheduledExecutor, all failed tasks are simply
suspended in background. This commit enables tasks that are
executed using SharedScheduledExecutor to have the ability to
be restarted even if the tasks are encountered failures.

Change-Id: Ibe00c7f5920b8ae3fe5a433a6f9ec08684d88f36
...@@ -36,5 +36,10 @@ ...@@ -36,5 +36,10 @@
36 <artifactId>onos-app-cpman-api</artifactId> 36 <artifactId>onos-app-cpman-api</artifactId>
37 <version>${project.version}</version> 37 <version>${project.version}</version>
38 </dependency> 38 </dependency>
39 + <dependency>
40 + <groupId>org.onosproject</groupId>
41 + <artifactId>onlab-misc</artifactId>
42 + <version>${project.version}</version>
43 + </dependency>
39 </dependencies> 44 </dependencies>
40 </project> 45 </project>
......
...@@ -22,6 +22,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -22,6 +22,7 @@ import org.apache.felix.scr.annotations.Deactivate;
22 import org.apache.felix.scr.annotations.Reference; 22 import org.apache.felix.scr.annotations.Reference;
23 import org.apache.felix.scr.annotations.ReferenceCardinality; 23 import org.apache.felix.scr.annotations.ReferenceCardinality;
24 import org.onlab.metrics.MetricsService; 24 import org.onlab.metrics.MetricsService;
25 +import org.onlab.util.SharedScheduledExecutorService;
25 import org.onlab.util.SharedScheduledExecutors; 26 import org.onlab.util.SharedScheduledExecutors;
26 import org.onosproject.cpman.message.ControlMessageProvider; 27 import org.onosproject.cpman.message.ControlMessageProvider;
27 import org.onosproject.cpman.message.ControlMessageProviderRegistry; 28 import org.onosproject.cpman.message.ControlMessageProviderRegistry;
...@@ -40,7 +41,6 @@ import org.projectfloodlight.openflow.protocol.OFPortStatus; ...@@ -40,7 +41,6 @@ import org.projectfloodlight.openflow.protocol.OFPortStatus;
40 import org.slf4j.Logger; 41 import org.slf4j.Logger;
41 42
42 import java.util.HashMap; 43 import java.util.HashMap;
43 -import java.util.concurrent.ScheduledExecutorService;
44 import java.util.concurrent.ScheduledFuture; 44 import java.util.concurrent.ScheduledFuture;
45 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.TimeUnit;
46 46
...@@ -77,7 +77,7 @@ public class OpenFlowControlMessageProvider extends AbstractProvider ...@@ -77,7 +77,7 @@ public class OpenFlowControlMessageProvider extends AbstractProvider
77 new InternalOutgoingMessageProvider(); 77 new InternalOutgoingMessageProvider();
78 78
79 private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap(); 79 private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap();
80 - private ScheduledExecutorService executor; 80 + private SharedScheduledExecutorService executor;
81 private static final int AGGR_INIT_DELAY = 1; 81 private static final int AGGR_INIT_DELAY = 1;
82 private static final int AGGR_PERIOD = 1; 82 private static final int AGGR_PERIOD = 1;
83 private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES; 83 private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES;
...@@ -159,7 +159,7 @@ public class OpenFlowControlMessageProvider extends AbstractProvider ...@@ -159,7 +159,7 @@ public class OpenFlowControlMessageProvider extends AbstractProvider
159 new OpenFlowControlMessageAggregator(metricsService, 159 new OpenFlowControlMessageAggregator(metricsService,
160 providerService, deviceId); 160 providerService, deviceId);
161 ScheduledFuture result = executor.scheduleAtFixedRate(ofcma, 161 ScheduledFuture result = executor.scheduleAtFixedRate(ofcma,
162 - AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT); 162 + AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT, true);
163 aggregators.put(dpid, ofcma); 163 aggregators.put(dpid, ofcma);
164 executorResults.put(dpid, result); 164 executorResults.put(dpid, result);
165 } 165 }
......
...@@ -34,7 +34,7 @@ import static org.slf4j.LoggerFactory.getLogger; ...@@ -34,7 +34,7 @@ import static org.slf4j.LoggerFactory.getLogger;
34 /** 34 /**
35 * A new scheduled executor service that does not eat exception. 35 * A new scheduled executor service that does not eat exception.
36 */ 36 */
37 -class SharedScheduledExecutorService implements ScheduledExecutorService { 37 +public class SharedScheduledExecutorService implements ScheduledExecutorService {
38 38
39 private static final String NOT_ALLOWED = "Shutdown of scheduled executor is not allowed"; 39 private static final String NOT_ALLOWED = "Shutdown of scheduled executor is not allowed";
40 private final Logger log = getLogger(getClass()); 40 private final Logger log = getLogger(getClass());
...@@ -62,17 +62,34 @@ class SharedScheduledExecutorService implements ScheduledExecutorService { ...@@ -62,17 +62,34 @@ class SharedScheduledExecutorService implements ScheduledExecutorService {
62 /** 62 /**
63 * Swaps the backing executor with a new one and shuts down the old one. 63 * Swaps the backing executor with a new one and shuts down the old one.
64 * 64 *
65 - * @param executor new scheduled executor service 65 + * @param executorService new scheduled executor service
66 */ 66 */
67 - void setBackingExecutor(ScheduledExecutorService executor) { 67 + void setBackingExecutor(ScheduledExecutorService executorService) {
68 ScheduledExecutorService oldExecutor = this.executor; 68 ScheduledExecutorService oldExecutor = this.executor;
69 - this.executor = executor; 69 + this.executor = executorService;
70 oldExecutor.shutdown(); 70 oldExecutor.shutdown();
71 } 71 }
72 72
73 + /**
74 + * Creates and executes a one-shot action that becomes enabled
75 + * after the given delay.
76 + *
77 + * @param command the task to execute
78 + * @param delay the time from now to delay execution
79 + * @param unit the time unit of the delay parameter
80 + * @param repeatFlag the flag to denote whether to restart a failed task
81 + * @return a ScheduledFuture representing pending completion of
82 + * the task and whose {@code get()} method will return
83 + * {@code null} upon completion
84 + */
85 + public ScheduledFuture<?> schedule(Runnable command, long delay,
86 + TimeUnit unit, boolean repeatFlag) {
87 + return executor.schedule(wrap(command, repeatFlag), delay, unit);
88 + }
89 +
73 @Override 90 @Override
74 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 91 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
75 - return executor.schedule(wrap(command), delay, unit); 92 + return schedule(command, delay, unit, false);
76 } 93 }
77 94
78 @Override 95 @Override
...@@ -88,16 +105,76 @@ class SharedScheduledExecutorService implements ScheduledExecutorService { ...@@ -88,16 +105,76 @@ class SharedScheduledExecutorService implements ScheduledExecutorService {
88 }, delay, unit); 105 }, delay, unit);
89 } 106 }
90 107
108 + /**
109 + * Creates and executes a periodic action that becomes enabled first
110 + * after the given initial delay, and subsequently with the given
111 + * period; that is executions will commence after
112 + * {@code initialDelay} then {@code initialDelay+period}, then
113 + * {@code initialDelay + 2 * period}, and so on.
114 + * Depends on the repeat flag that the user set, the failed tasks can be
115 + * either restarted or terminated. If the repeat flag is set to to true,
116 + * ant execution of the task encounters an exception, subsequent executions
117 + * are permitted, otherwise, subsequent executions are suppressed.
118 + * If any execution of this task takes longer than its period, then
119 + * subsequent executions may start late, but will not concurrently execute.
120 + *
121 + * @param command the task to execute
122 + * @param initialDelay the time to delay first execution
123 + * @param period the period between successive executions
124 + * @param unit the time unit of the initialDelay and period parameters
125 + * @param repeatFlag the flag to denote whether to restart a failed task
126 + * @return a ScheduledFuture representing pending completion of
127 + * the task, and whose {@code get()} method will throw an
128 + * exception upon cancellation
129 + */
130 + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
131 + long initialDelay,
132 + long period,
133 + TimeUnit unit,
134 + boolean repeatFlag) {
135 + return executor.scheduleAtFixedRate(wrap(command, repeatFlag),
136 + initialDelay, period, unit);
137 + }
138 +
91 @Override 139 @Override
92 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 140 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
93 long period, TimeUnit unit) { 141 long period, TimeUnit unit) {
94 - return executor.scheduleAtFixedRate(wrap(command), initialDelay, period, unit); 142 + return scheduleAtFixedRate(command, initialDelay, period, unit, false);
143 + }
144 +
145 + /**
146 + * Creates and executes a periodic action that becomes enabled first
147 + * after the given initial delay, and subsequently with the
148 + * given delay between the termination of one execution and the
149 + * commencement of the next.
150 + * Depends on the repeat flag that the user set, the failed tasks can be
151 + * either restarted or terminated. If the repeat flag is set to to true,
152 + * ant execution of the task encounters an exception, subsequent executions
153 + * are permitted, otherwise, subsequent executions are suppressed.
154 + *
155 + * @param command the task to execute
156 + * @param initialDelay the time to delay first execution
157 + * @param delay the delay between the termination of one
158 + * execution and the commencement of the next
159 + * @param unit the time unit of the initialDelay and delay parameters
160 + * @param repeatFlag the flag to denote whether to restart a failed task
161 + * @return a ScheduledFuture representing pending completion of
162 + * the task, and whose {@code get()} method will throw an
163 + * exception upon cancellation
164 + */
165 + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
166 + long initialDelay,
167 + long delay,
168 + TimeUnit unit,
169 + boolean repeatFlag) {
170 + return executor.scheduleWithFixedDelay(wrap(command, repeatFlag),
171 + initialDelay, delay, unit);
95 } 172 }
96 173
97 @Override 174 @Override
98 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 175 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
99 long delay, TimeUnit unit) { 176 long delay, TimeUnit unit) {
100 - return executor.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit); 177 + return scheduleWithFixedDelay(command, initialDelay, delay, unit, false);
101 } 178 }
102 179
103 @Override 180 @Override
...@@ -171,8 +248,8 @@ class SharedScheduledExecutorService implements ScheduledExecutorService { ...@@ -171,8 +248,8 @@ class SharedScheduledExecutorService implements ScheduledExecutorService {
171 executor.execute(command); 248 executor.execute(command);
172 } 249 }
173 250
174 - private Runnable wrap(Runnable command) { 251 + private Runnable wrap(Runnable command, boolean repeatFlag) {
175 - return new LoggableRunnable(command); 252 + return new LoggableRunnable(command, repeatFlag);
176 } 253 }
177 254
178 /** 255 /**
...@@ -180,19 +257,31 @@ class SharedScheduledExecutorService implements ScheduledExecutorService { ...@@ -180,19 +257,31 @@ class SharedScheduledExecutorService implements ScheduledExecutorService {
180 */ 257 */
181 private class LoggableRunnable implements Runnable { 258 private class LoggableRunnable implements Runnable {
182 private Runnable runnable; 259 private Runnable runnable;
260 + private boolean repeatFlag;
183 261
184 - public LoggableRunnable(Runnable runnable) { 262 + public LoggableRunnable(Runnable runnable, boolean repeatFlag) {
185 super(); 263 super();
186 this.runnable = runnable; 264 this.runnable = runnable;
265 + this.repeatFlag = repeatFlag;
187 } 266 }
188 267
189 @Override 268 @Override
190 public void run() { 269 public void run() {
270 + if (Thread.currentThread().isInterrupted()) {
271 + log.info("Task interrupted, quitting");
272 + return;
273 + }
274 +
191 try { 275 try {
192 runnable.run(); 276 runnable.run();
193 } catch (Exception e) { 277 } catch (Exception e) {
194 log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e); 278 log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
195 - throw Throwables.propagate(e); 279 +
280 + // if repeat flag set as false, we simply throw an exception to
281 + // terminate this task
282 + if (!repeatFlag) {
283 + throw Throwables.propagate(e);
284 + }
196 } 285 }
197 } 286 }
198 } 287 }
......
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
15 */ 15 */
16 package org.onlab.util; 16 package org.onlab.util;
17 17
18 -import java.util.concurrent.ScheduledExecutorService;
19 -
20 import static com.google.common.base.Preconditions.checkArgument; 18 import static com.google.common.base.Preconditions.checkArgument;
21 import static java.util.concurrent.Executors.newScheduledThreadPool; 19 import static java.util.concurrent.Executors.newScheduledThreadPool;
22 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; 20 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
...@@ -56,7 +54,7 @@ public final class SharedScheduledExecutors { ...@@ -56,7 +54,7 @@ public final class SharedScheduledExecutors {
56 * 54 *
57 * @return shared scheduled single thread executor 55 * @return shared scheduled single thread executor
58 */ 56 */
59 - public static ScheduledExecutorService getSingleThreadExecutor() { 57 + public static SharedScheduledExecutorService getSingleThreadExecutor() {
60 return singleThreadExecutor; 58 return singleThreadExecutor;
61 } 59 }
62 60
...@@ -65,7 +63,7 @@ public final class SharedScheduledExecutors { ...@@ -65,7 +63,7 @@ public final class SharedScheduledExecutors {
65 * 63 *
66 * @return shared scheduled executor pool 64 * @return shared scheduled executor pool
67 */ 65 */
68 - public static ScheduledExecutorService getPoolThreadExecutor() { 66 + public static SharedScheduledExecutorService getPoolThreadExecutor() {
69 return poolThreadExecutor; 67 return poolThreadExecutor;
70 } 68 }
71 69
......