Madan Jampani
Committed by Gerrit Code Review

Make sure message handling executor references are appropriately managed when we…

… modify the service configuration

Change-Id: I56866dd8c3359de0fcf827fc247024c65a63c5c2
...@@ -180,6 +180,8 @@ public class DistributedFlowRuleStore ...@@ -180,6 +180,8 @@ public class DistributedFlowRuleStore
180 180
181 private IdGenerator idGenerator; 181 private IdGenerator idGenerator;
182 182
183 + private NodeId local;
184 +
183 @Activate 185 @Activate
184 public void activate(ComponentContext context) { 186 public void activate(ComponentContext context) {
185 configService.registerProperties(getClass()); 187 configService.registerProperties(getClass());
...@@ -188,17 +190,76 @@ public class DistributedFlowRuleStore ...@@ -188,17 +190,76 @@ public class DistributedFlowRuleStore
188 190
189 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC); 191 idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
190 192
193 + local = clusterService.getLocalNode().id();
194 +
191 // Cache to create SMap on demand 195 // Cache to create SMap on demand
192 smaps = CacheBuilder.newBuilder() 196 smaps = CacheBuilder.newBuilder()
193 .softValues() 197 .softValues()
194 .build(new SMapLoader()); 198 .build(new SMapLoader());
195 199
196 - final NodeId local = clusterService.getLocalNode().id(); 200 + messageHandlingExecutor = Executors.newFixedThreadPool(
201 + msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
202 +
203 + registerMessageHandlers(messageHandlingExecutor);
204 +
205 + replicaInfoEventListener = new InternalReplicaInfoEventListener();
206 +
207 + replicaInfoManager.addListener(replicaInfoEventListener);
208 +
209 + logConfig("Started");
210 + }
197 211
212 + @Deactivate
213 + public void deactivate(ComponentContext context) {
214 + configService.unregisterProperties(getClass(), false);
215 + unregisterMessageHandlers();
216 + messageHandlingExecutor.shutdownNow();
217 + replicaInfoManager.removeListener(replicaInfoEventListener);
218 + log.info("Stopped");
219 + }
220 +
221 + @Modified
222 + public void modified(ComponentContext context) {
223 + if (context == null) {
224 + backupEnabled = DEFAULT_BACKUP_ENABLED;
225 + logConfig("Default config");
226 + return;
227 + }
228 +
229 + Dictionary properties = context.getProperties();
230 + int newPoolSize;
231 + boolean newBackupEnabled;
232 + try {
233 + String s = get(properties, "msgHandlerPoolSize");
234 + newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
235 +
236 + s = get(properties, "backupEnabled");
237 + newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
238 +
239 + } catch (NumberFormatException | ClassCastException e) {
240 + newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
241 + newBackupEnabled = DEFAULT_BACKUP_ENABLED;
242 + }
243 +
244 + if (newBackupEnabled != backupEnabled) {
245 + backupEnabled = newBackupEnabled;
246 + }
247 + if (newPoolSize != msgHandlerPoolSize) {
248 + msgHandlerPoolSize = newPoolSize;
249 + ExecutorService oldMsgHandler = messageHandlingExecutor;
198 messageHandlingExecutor = Executors.newFixedThreadPool( 250 messageHandlingExecutor = Executors.newFixedThreadPool(
199 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers")); 251 msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
200 252
201 - clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor); 253 + // replace previously registered handlers.
254 + registerMessageHandlers(messageHandlingExecutor);
255 + oldMsgHandler.shutdown();
256 + }
257 + logConfig("Reconfigured");
258 + }
259 +
260 + private void registerMessageHandlers(ExecutorService executor) {
261 +
262 + clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), executor);
202 263
203 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() { 264 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
204 @Override 265 @Override
...@@ -207,7 +268,7 @@ public class DistributedFlowRuleStore ...@@ -207,7 +268,7 @@ public class DistributedFlowRuleStore
207 log.trace("received completed notification for {}", event); 268 log.trace("received completed notification for {}", event);
208 notifyDelegate(event); 269 notifyDelegate(event);
209 } 270 }
210 - }, messageHandlingExecutor); 271 + }, executor);
211 272
212 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() { 273 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
213 274
...@@ -222,7 +283,7 @@ public class DistributedFlowRuleStore ...@@ -222,7 +283,7 @@ public class DistributedFlowRuleStore
222 log.error("Failed to respond back", e); 283 log.error("Failed to respond back", e);
223 } 284 }
224 } 285 }
225 - }, messageHandlingExecutor); 286 + }, executor);
226 287
227 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() { 288 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
228 289
...@@ -237,7 +298,7 @@ public class DistributedFlowRuleStore ...@@ -237,7 +298,7 @@ public class DistributedFlowRuleStore
237 log.error("Failed to respond to peer's getFlowEntries request", e); 298 log.error("Failed to respond to peer's getFlowEntries request", e);
238 } 299 }
239 } 300 }
240 - }, messageHandlingExecutor); 301 + }, executor);
241 302
242 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() { 303 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
243 304
...@@ -252,60 +313,15 @@ public class DistributedFlowRuleStore ...@@ -252,60 +313,15 @@ public class DistributedFlowRuleStore
252 log.error("Failed to respond back", e); 313 log.error("Failed to respond back", e);
253 } 314 }
254 } 315 }
255 - }, messageHandlingExecutor); 316 + }, executor);
256 -
257 - replicaInfoEventListener = new InternalReplicaInfoEventListener();
258 -
259 - replicaInfoManager.addListener(replicaInfoEventListener);
260 -
261 - logConfig("Started");
262 } 317 }
263 318
264 - @Deactivate 319 + private void unregisterMessageHandlers() {
265 - public void deactivate(ComponentContext context) {
266 - configService.unregisterProperties(getClass(), false);
267 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY); 320 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
268 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES); 321 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
269 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY); 322 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
270 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS); 323 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
271 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED); 324 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
272 - messageHandlingExecutor.shutdown();
273 - replicaInfoManager.removeListener(replicaInfoEventListener);
274 - log.info("Stopped");
275 - }
276 -
277 - @Modified
278 - public void modified(ComponentContext context) {
279 - if (context == null) {
280 - backupEnabled = DEFAULT_BACKUP_ENABLED;
281 - logConfig("Default config");
282 - return;
283 - }
284 -
285 - Dictionary properties = context.getProperties();
286 - int newPoolSize;
287 - boolean newBackupEnabled;
288 - try {
289 - String s = get(properties, "msgHandlerPoolSize");
290 - newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
291 -
292 - s = get(properties, "backupEnabled");
293 - newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
294 -
295 - } catch (NumberFormatException | ClassCastException e) {
296 - newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
297 - newBackupEnabled = DEFAULT_BACKUP_ENABLED;
298 - }
299 -
300 - if (newPoolSize != msgHandlerPoolSize || newBackupEnabled != backupEnabled) {
301 - msgHandlerPoolSize = newPoolSize;
302 - backupEnabled = newBackupEnabled;
303 - ExecutorService oldMsgHandler = messageHandlingExecutor;
304 - messageHandlingExecutor = Executors.newFixedThreadPool(
305 - msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
306 - oldMsgHandler.shutdown();
307 - logConfig("Reconfigured");
308 - }
309 } 325 }
310 326
311 private void logConfig(String prefix) { 327 private void logConfig(String prefix) {
......
...@@ -200,12 +200,12 @@ public class NettyMessagingService implements MessagingService { ...@@ -200,12 +200,12 @@ public class NettyMessagingService implements MessagingService {
200 200
201 @Override 201 @Override
202 public void registerHandler(String type, MessageHandler handler) { 202 public void registerHandler(String type, MessageHandler handler) {
203 - handlers.putIfAbsent(type, handler); 203 + handlers.put(type, handler);
204 } 204 }
205 205
206 @Override 206 @Override
207 public void registerHandler(String type, MessageHandler handler, ExecutorService executor) { 207 public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
208 - handlers.putIfAbsent(type, new MessageHandler() { 208 + handlers.put(type, new MessageHandler() {
209 @Override 209 @Override
210 public void handle(Message message) throws IOException { 210 public void handle(Message message) throws IOException {
211 executor.submit(() -> { 211 executor.submit(() -> {
......