Madan Jampani

Never process incoming messages on the netty event loop thread pool.

Currently in a lot of places we are deserializing incoming messages on this threadpool and that could be significantly limiting throughput.

Change-Id: I83eb7e91004cea4addb28bc28f27e50de10028fe
Showing 18 changed files with 325 additions and 259 deletions
...@@ -16,10 +16,12 @@ ...@@ -16,10 +16,12 @@
16 package org.onosproject.store.cluster.messaging; 16 package org.onosproject.store.cluster.messaging;
17 17
18 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
19 +
19 import org.onosproject.cluster.NodeId; 20 import org.onosproject.cluster.NodeId;
20 21
21 import java.io.IOException; 22 import java.io.IOException;
22 import java.util.Set; 23 import java.util.Set;
24 +import java.util.concurrent.ExecutorService;
23 25
24 // TODO: remove IOExceptions? 26 // TODO: remove IOExceptions?
25 /** 27 /**
...@@ -77,9 +79,19 @@ public interface ClusterCommunicationService { ...@@ -77,9 +79,19 @@ public interface ClusterCommunicationService {
77 * @param subject message subject 79 * @param subject message subject
78 * @param subscriber message subscriber 80 * @param subscriber message subscriber
79 */ 81 */
82 + @Deprecated
80 void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber); 83 void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber);
81 84
82 /** 85 /**
86 + * Adds a new subscriber for the specified message subject.
87 + *
88 + * @param subject message subject
89 + * @param subscriber message subscriber
90 + * @param executor executor to use for running handler.
91 + */
92 + void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber, ExecutorService executor);
93 +
94 + /**
83 * Removes a subscriber for the specified message subject. 95 * Removes a subscriber for the specified message subject.
84 * 96 *
85 * @param subject message subject 97 * @param subject message subject
......
...@@ -18,6 +18,7 @@ package org.onosproject.store.app; ...@@ -18,6 +18,7 @@ package org.onosproject.store.app;
18 import com.google.common.base.Charsets; 18 import com.google.common.base.Charsets;
19 import com.google.common.collect.ImmutableSet; 19 import com.google.common.collect.ImmutableSet;
20 import com.google.common.util.concurrent.ListenableFuture; 20 import com.google.common.util.concurrent.ListenableFuture;
21 +
21 import org.apache.felix.scr.annotations.Activate; 22 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 23 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 24 import org.apache.felix.scr.annotations.Deactivate;
...@@ -55,6 +56,7 @@ import java.io.IOException; ...@@ -55,6 +56,7 @@ import java.io.IOException;
55 import java.io.InputStream; 56 import java.io.InputStream;
56 import java.util.Set; 57 import java.util.Set;
57 import java.util.concurrent.CountDownLatch; 58 import java.util.concurrent.CountDownLatch;
59 +import java.util.concurrent.ExecutorService;
58 import java.util.concurrent.Executors; 60 import java.util.concurrent.Executors;
59 import java.util.concurrent.ScheduledExecutorService; 61 import java.util.concurrent.ScheduledExecutorService;
60 62
...@@ -90,6 +92,8 @@ public class GossipApplicationStore extends ApplicationArchive ...@@ -90,6 +92,8 @@ public class GossipApplicationStore extends ApplicationArchive
90 private final ScheduledExecutorService executor = 92 private final ScheduledExecutorService executor =
91 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store")); 93 Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
92 94
95 + private ExecutorService messageHandlingExecutor;
96 +
93 private EventuallyConsistentMap<ApplicationId, Application> apps; 97 private EventuallyConsistentMap<ApplicationId, Application> apps;
94 private EventuallyConsistentMap<Application, InternalState> states; 98 private EventuallyConsistentMap<Application, InternalState> states;
95 private EventuallyConsistentMap<Application, Set<Permission>> permissions; 99 private EventuallyConsistentMap<Application, Set<Permission>> permissions;
...@@ -109,7 +113,10 @@ public class GossipApplicationStore extends ApplicationArchive ...@@ -109,7 +113,10 @@ public class GossipApplicationStore extends ApplicationArchive
109 .register(KryoNamespaces.API) 113 .register(KryoNamespaces.API)
110 .register(InternalState.class); 114 .register(InternalState.class);
111 115
112 - clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer()); 116 + messageHandlingExecutor = Executors.newSingleThreadExecutor(
117 + groupedThreads("onos/store/app", "message-handler"));
118 +
119 + clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
113 120
114 apps = new EventuallyConsistentMapImpl<>("apps", clusterService, 121 apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
115 clusterCommunicator, 122 clusterCommunicator,
...@@ -145,6 +152,8 @@ public class GossipApplicationStore extends ApplicationArchive ...@@ -145,6 +152,8 @@ public class GossipApplicationStore extends ApplicationArchive
145 152
146 @Deactivate 153 @Deactivate
147 public void deactivate() { 154 public void deactivate() {
155 + clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
156 + messageHandlingExecutor.shutdown();
148 apps.destroy(); 157 apps.destroy();
149 states.destroy(); 158 states.destroy();
150 permissions.destroy(); 159 permissions.destroy();
......
...@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl; ...@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl;
18 import com.google.common.collect.Maps; 18 import com.google.common.collect.Maps;
19 import com.hazelcast.config.TopicConfig; 19 import com.hazelcast.config.TopicConfig;
20 import com.hazelcast.core.IAtomicLong; 20 import com.hazelcast.core.IAtomicLong;
21 +
21 import org.apache.felix.scr.annotations.Activate; 22 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 23 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 24 import org.apache.felix.scr.annotations.Deactivate;
...@@ -113,6 +114,8 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -113,6 +114,8 @@ public class HazelcastLeadershipService implements LeadershipService {
113 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT = 114 private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
114 new MessageSubject("hz-leadership-events"); 115 new MessageSubject("hz-leadership-events");
115 116
117 + private ExecutorService messageHandlingExecutor;
118 +
116 @Activate 119 @Activate
117 protected void activate() { 120 protected void activate() {
118 localNodeId = clusterService.getLocalNode().id(); 121 localNodeId = clusterService.getLocalNode().id();
...@@ -124,7 +127,13 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -124,7 +127,13 @@ public class HazelcastLeadershipService implements LeadershipService {
124 topicConfig.setName(TOPIC_HZ_ID); 127 topicConfig.setName(TOPIC_HZ_ID);
125 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig); 128 storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
126 129
127 - clusterCommunicator.addSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT, new InternalLeadershipEventListener()); 130 + messageHandlingExecutor = Executors.newSingleThreadExecutor(
131 + groupedThreads("onos/store/leadership", "message-handler"));
132 +
133 + clusterCommunicator.addSubscriber(
134 + LEADERSHIP_EVENT_MESSAGE_SUBJECT,
135 + new InternalLeadershipEventListener(),
136 + messageHandlingExecutor);
128 137
129 log.info("Hazelcast Leadership Service started"); 138 log.info("Hazelcast Leadership Service started");
130 } 139 }
...@@ -132,6 +141,7 @@ public class HazelcastLeadershipService implements LeadershipService { ...@@ -132,6 +141,7 @@ public class HazelcastLeadershipService implements LeadershipService {
132 @Deactivate 141 @Deactivate
133 protected void deactivate() { 142 protected void deactivate() {
134 eventDispatcher.removeSink(LeadershipEvent.class); 143 eventDispatcher.removeSink(LeadershipEvent.class);
144 + messageHandlingExecutor.shutdown();
135 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT); 145 clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
136 146
137 for (Topic topic : topics.values()) { 147 for (Topic topic : topics.values()) {
......
...@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl; ...@@ -18,6 +18,7 @@ package org.onosproject.store.cluster.impl;
18 import com.google.common.collect.ImmutableMap; 18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.collect.Maps; 19 import com.google.common.collect.Maps;
20 import com.google.common.collect.Sets; 20 import com.google.common.collect.Sets;
21 +
21 import org.apache.felix.scr.annotations.Activate; 22 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component; 23 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate; 24 import org.apache.felix.scr.annotations.Deactivate;
...@@ -43,6 +44,7 @@ import org.slf4j.Logger; ...@@ -43,6 +44,7 @@ import org.slf4j.Logger;
43 44
44 import java.util.Map; 45 import java.util.Map;
45 import java.util.Set; 46 import java.util.Set;
47 +import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Executors; 48 import java.util.concurrent.Executors;
47 import java.util.concurrent.ScheduledExecutorService; 49 import java.util.concurrent.ScheduledExecutorService;
48 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.TimeUnit;
...@@ -92,6 +94,8 @@ public class LeadershipManager implements LeadershipService { ...@@ -92,6 +94,8 @@ public class LeadershipManager implements LeadershipService {
92 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser(); 94 private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
93 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater(); 95 private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
94 96
97 + private ExecutorService messageHandlingExecutor;
98 +
95 public static final KryoSerializer SERIALIZER = new KryoSerializer() { 99 public static final KryoSerializer SERIALIZER = new KryoSerializer() {
96 @Override 100 @Override
97 protected void setupKryoPool() { 101 protected void setupKryoPool() {
...@@ -109,9 +113,14 @@ public class LeadershipManager implements LeadershipService { ...@@ -109,9 +113,14 @@ public class LeadershipManager implements LeadershipService {
109 addListener(peerAdvertiser); 113 addListener(peerAdvertiser);
110 addListener(leaderBoardUpdater); 114 addListener(leaderBoardUpdater);
111 115
116 + messageHandlingExecutor = Executors.newSingleThreadExecutor(
117 + groupedThreads("onos/store/leadership",
118 + "peer-advertisement-handler"));
119 +
112 clusterCommunicator.addSubscriber( 120 clusterCommunicator.addSubscriber(
113 LEADERSHIP_UPDATES, 121 LEADERSHIP_UPDATES,
114 - new PeerAdvertisementHandler()); 122 + new PeerAdvertisementHandler(),
123 + messageHandlingExecutor);
115 124
116 log.info("Started."); 125 log.info("Started.");
117 } 126 }
...@@ -123,6 +132,7 @@ public class LeadershipManager implements LeadershipService { ...@@ -123,6 +132,7 @@ public class LeadershipManager implements LeadershipService {
123 132
124 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES); 133 clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
125 134
135 + messageHandlingExecutor.shutdown();
126 threadPool.shutdown(); 136 threadPool.shutdown();
127 137
128 log.info("Stopped."); 138 log.info("Stopped.");
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.cluster.messaging.impl; 16 package org.onosproject.store.cluster.messaging.impl;
17 17
18 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.ListenableFuture;
19 +
19 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory; ...@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
44 45
45 import java.io.IOException; 46 import java.io.IOException;
46 import java.util.Set; 47 import java.util.Set;
48 +import java.util.concurrent.ExecutorService;
47 49
48 import static com.google.common.base.Preconditions.checkArgument; 50 import static com.google.common.base.Preconditions.checkArgument;
49 51
...@@ -183,6 +185,13 @@ public class ClusterCommunicationManager ...@@ -183,6 +185,13 @@ public class ClusterCommunicationManager
183 } 185 }
184 186
185 @Override 187 @Override
188 + public void addSubscriber(MessageSubject subject,
189 + ClusterMessageHandler subscriber,
190 + ExecutorService executor) {
191 + messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber), executor);
192 + }
193 +
194 + @Override
186 public void removeSubscriber(MessageSubject subject) { 195 public void removeSubscriber(MessageSubject subject) {
187 messagingService.unregisterHandler(subject.value()); 196 messagingService.unregisterHandler(subject.value());
188 } 197 }
......
...@@ -176,28 +176,35 @@ public class GossipDeviceStore ...@@ -176,28 +176,35 @@ public class GossipDeviceStore
176 176
177 @Activate 177 @Activate
178 public void activate() { 178 public void activate() {
179 +
180 + executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
181 +
182 + backgroundExecutor =
183 + newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
184 +
179 clusterCommunicator.addSubscriber( 185 clusterCommunicator.addSubscriber(
180 - GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener()); 186 + GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
181 clusterCommunicator.addSubscriber( 187 clusterCommunicator.addSubscriber(
182 - GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener()); 188 + GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
183 - clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ, new InternalRemoveRequestListener()); 189 + new InternalDeviceOfflineEventListener(),
190 + executor);
191 + clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
192 + new InternalRemoveRequestListener(),
193 + executor);
184 clusterCommunicator.addSubscriber( 194 clusterCommunicator.addSubscriber(
185 - GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener()); 195 + GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
186 clusterCommunicator.addSubscriber( 196 clusterCommunicator.addSubscriber(
187 - GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener()); 197 + GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
188 clusterCommunicator.addSubscriber( 198 clusterCommunicator.addSubscriber(
189 - GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener()); 199 + GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
190 clusterCommunicator.addSubscriber( 200 clusterCommunicator.addSubscriber(
191 - GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE, new InternalDeviceAdvertisementListener()); 201 + GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
202 + new InternalDeviceAdvertisementListener(),
203 + backgroundExecutor);
192 clusterCommunicator.addSubscriber( 204 clusterCommunicator.addSubscriber(
193 - GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener()); 205 + GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
194 clusterCommunicator.addSubscriber( 206 clusterCommunicator.addSubscriber(
195 - GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener()); 207 + GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
196 -
197 - executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
198 -
199 - backgroundExecutor =
200 - newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
201 208
202 // start anti-entropy thread 209 // start anti-entropy thread
203 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), 210 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
...@@ -1325,17 +1332,11 @@ public class GossipDeviceStore ...@@ -1325,17 +1332,11 @@ public class GossipDeviceStore
1325 DeviceId deviceId = event.deviceId(); 1332 DeviceId deviceId = event.deviceId();
1326 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); 1333 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1327 1334
1328 - executor.submit(new Runnable() { 1335 + try {
1329 - 1336 + notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1330 - @Override 1337 + } catch (Exception e) {
1331 - public void run() { 1338 + log.warn("Exception thrown handling device update", e);
1332 - try { 1339 + }
1333 - notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1334 - } catch (Exception e) {
1335 - log.warn("Exception thrown handling device update", e);
1336 - }
1337 - }
1338 - });
1339 } 1340 }
1340 } 1341 }
1341 1342
...@@ -1350,17 +1351,11 @@ public class GossipDeviceStore ...@@ -1350,17 +1351,11 @@ public class GossipDeviceStore
1350 DeviceId deviceId = event.deviceId(); 1351 DeviceId deviceId = event.deviceId();
1351 Timestamp timestamp = event.timestamp(); 1352 Timestamp timestamp = event.timestamp();
1352 1353
1353 - executor.submit(new Runnable() { 1354 + try {
1354 - 1355 + notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1355 - @Override 1356 + } catch (Exception e) {
1356 - public void run() { 1357 + log.warn("Exception thrown handling device offline", e);
1357 - try { 1358 + }
1358 - notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1359 - } catch (Exception e) {
1360 - log.warn("Exception thrown handling device offline", e);
1361 - }
1362 - }
1363 - });
1364 } 1359 }
1365 } 1360 }
1366 1361
...@@ -1371,17 +1366,11 @@ public class GossipDeviceStore ...@@ -1371,17 +1366,11 @@ public class GossipDeviceStore
1371 log.debug("Received device remove request from peer: {}", message.sender()); 1366 log.debug("Received device remove request from peer: {}", message.sender());
1372 DeviceId did = SERIALIZER.decode(message.payload()); 1367 DeviceId did = SERIALIZER.decode(message.payload());
1373 1368
1374 - executor.submit(new Runnable() { 1369 + try {
1375 - 1370 + removeDevice(did);
1376 - @Override 1371 + } catch (Exception e) {
1377 - public void run() { 1372 + log.warn("Exception thrown handling device remove", e);
1378 - try { 1373 + }
1379 - removeDevice(did);
1380 - } catch (Exception e) {
1381 - log.warn("Exception thrown handling device remove", e);
1382 - }
1383 - }
1384 - });
1385 } 1374 }
1386 } 1375 }
1387 1376
...@@ -1396,17 +1385,11 @@ public class GossipDeviceStore ...@@ -1396,17 +1385,11 @@ public class GossipDeviceStore
1396 DeviceId deviceId = event.deviceId(); 1385 DeviceId deviceId = event.deviceId();
1397 Timestamp timestamp = event.timestamp(); 1386 Timestamp timestamp = event.timestamp();
1398 1387
1399 - executor.submit(new Runnable() { 1388 + try {
1400 - 1389 + notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1401 - @Override 1390 + } catch (Exception e) {
1402 - public void run() { 1391 + log.warn("Exception thrown handling device removed", e);
1403 - try { 1392 + }
1404 - notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1405 - } catch (Exception e) {
1406 - log.warn("Exception thrown handling device removed", e);
1407 - }
1408 - }
1409 - });
1410 } 1393 }
1411 } 1394 }
1412 1395
...@@ -1428,17 +1411,11 @@ public class GossipDeviceStore ...@@ -1428,17 +1411,11 @@ public class GossipDeviceStore
1428 return; 1411 return;
1429 } 1412 }
1430 1413
1431 - executor.submit(new Runnable() { 1414 + try {
1432 - 1415 + notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1433 - @Override 1416 + } catch (Exception e) {
1434 - public void run() { 1417 + log.warn("Exception thrown handling port update", e);
1435 - try { 1418 + }
1436 - notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1437 - } catch (Exception e) {
1438 - log.warn("Exception thrown handling port update", e);
1439 - }
1440 - }
1441 - });
1442 } 1419 }
1443 } 1420 }
1444 1421
...@@ -1460,17 +1437,11 @@ public class GossipDeviceStore ...@@ -1460,17 +1437,11 @@ public class GossipDeviceStore
1460 return; 1437 return;
1461 } 1438 }
1462 1439
1463 - executor.submit(new Runnable() { 1440 + try {
1464 - 1441 + notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1465 - @Override 1442 + } catch (Exception e) {
1466 - public void run() { 1443 + log.warn("Exception thrown handling port update", e);
1467 - try { 1444 + }
1468 - notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1469 - } catch (Exception e) {
1470 - log.warn("Exception thrown handling port update", e);
1471 - }
1472 - }
1473 - });
1474 } 1445 }
1475 } 1446 }
1476 1447
...@@ -1481,17 +1452,11 @@ public class GossipDeviceStore ...@@ -1481,17 +1452,11 @@ public class GossipDeviceStore
1481 public void handle(ClusterMessage message) { 1452 public void handle(ClusterMessage message) {
1482 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender()); 1453 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1483 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); 1454 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1484 - backgroundExecutor.submit(new Runnable() { 1455 + try {
1485 - 1456 + handleAdvertisement(advertisement);
1486 - @Override 1457 + } catch (Exception e) {
1487 - public void run() { 1458 + log.warn("Exception thrown handling Device advertisements.", e);
1488 - try { 1459 + }
1489 - handleAdvertisement(advertisement);
1490 - } catch (Exception e) {
1491 - log.warn("Exception thrown handling Device advertisements.", e);
1492 - }
1493 - }
1494 - });
1495 } 1460 }
1496 } 1461 }
1497 1462
...@@ -1507,13 +1472,11 @@ public class GossipDeviceStore ...@@ -1507,13 +1472,11 @@ public class GossipDeviceStore
1507 DeviceId deviceId = event.deviceId(); 1472 DeviceId deviceId = event.deviceId();
1508 DeviceDescription deviceDescription = event.deviceDescription(); 1473 DeviceDescription deviceDescription = event.deviceDescription();
1509 1474
1510 - executor.submit(new Runnable() { 1475 + try {
1511 - 1476 + createOrUpdateDevice(providerId, deviceId, deviceDescription);
1512 - @Override 1477 + } catch (Exception e) {
1513 - public void run() { 1478 + log.warn("Exception thrown handling device injected event.", e);
1514 - createOrUpdateDevice(providerId, deviceId, deviceDescription); 1479 + }
1515 - }
1516 - });
1517 } 1480 }
1518 } 1481 }
1519 1482
...@@ -1529,13 +1492,11 @@ public class GossipDeviceStore ...@@ -1529,13 +1492,11 @@ public class GossipDeviceStore
1529 DeviceId deviceId = event.deviceId(); 1492 DeviceId deviceId = event.deviceId();
1530 List<PortDescription> portDescriptions = event.portDescriptions(); 1493 List<PortDescription> portDescriptions = event.portDescriptions();
1531 1494
1532 - executor.submit(new Runnable() { 1495 + try {
1533 - 1496 + updatePorts(providerId, deviceId, portDescriptions);
1534 - @Override 1497 + } catch (Exception e) {
1535 - public void run() { 1498 + log.warn("Exception thrown handling port injected event.", e);
1536 - updatePorts(providerId, deviceId, portDescriptions); 1499 + }
1537 - }
1538 - });
1539 } 1500 }
1540 } 1501 }
1541 } 1502 }
......
...@@ -162,13 +162,13 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -162,13 +162,13 @@ public class EventuallyConsistentMapImpl<K, V>
162 162
163 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update"); 163 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
164 clusterCommunicator.addSubscriber(updateMessageSubject, 164 clusterCommunicator.addSubscriber(updateMessageSubject,
165 - new InternalPutEventListener()); 165 + new InternalPutEventListener(), executor);
166 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove"); 166 removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
167 clusterCommunicator.addSubscriber(removeMessageSubject, 167 clusterCommunicator.addSubscriber(removeMessageSubject,
168 - new InternalRemoveEventListener()); 168 + new InternalRemoveEventListener(), executor);
169 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy"); 169 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
170 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject, 170 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
171 - new InternalAntiEntropyListener()); 171 + new InternalAntiEntropyListener(), backgroundExecutor);
172 } 172 }
173 173
174 private KryoSerializer createSerializer(KryoNamespace.Builder builder) { 174 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
...@@ -728,13 +728,11 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -728,13 +728,11 @@ public class EventuallyConsistentMapImpl<K, V>
728 log.trace("Received anti-entropy advertisement from peer: {}", 728 log.trace("Received anti-entropy advertisement from peer: {}",
729 message.sender()); 729 message.sender());
730 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload()); 730 AntiEntropyAdvertisement<K> advertisement = serializer.decode(message.payload());
731 - backgroundExecutor.submit(() -> { 731 + try {
732 - try { 732 + handleAntiEntropyAdvertisement(advertisement);
733 - handleAntiEntropyAdvertisement(advertisement); 733 + } catch (Exception e) {
734 - } catch (Exception e) { 734 + log.warn("Exception thrown handling advertisements", e);
735 - log.warn("Exception thrown handling advertisements", e); 735 + }
736 - }
737 - });
738 } 736 }
739 } 737 }
740 738
...@@ -745,25 +743,23 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -745,25 +743,23 @@ public class EventuallyConsistentMapImpl<K, V>
745 log.debug("Received put event from peer: {}", message.sender()); 743 log.debug("Received put event from peer: {}", message.sender());
746 InternalPutEvent<K, V> event = serializer.decode(message.payload()); 744 InternalPutEvent<K, V> event = serializer.decode(message.payload());
747 745
748 - executor.submit(() -> { 746 + try {
749 - try { 747 + for (PutEntry<K, V> entry : event.entries()) {
750 - for (PutEntry<K, V> entry : event.entries()) { 748 + K key = entry.key();
751 - K key = entry.key(); 749 + V value = entry.value();
752 - V value = entry.value(); 750 + Timestamp timestamp = entry.timestamp();
753 - Timestamp timestamp = entry.timestamp(); 751 +
754 - 752 + if (putInternal(key, value, timestamp)) {
755 - if (putInternal(key, value, timestamp)) { 753 + EventuallyConsistentMapEvent<K, V> externalEvent =
756 - EventuallyConsistentMapEvent<K, V> externalEvent = 754 + new EventuallyConsistentMapEvent<>(
757 - new EventuallyConsistentMapEvent<>( 755 + EventuallyConsistentMapEvent.Type.PUT, key,
758 - EventuallyConsistentMapEvent.Type.PUT, key, 756 + value);
759 - value); 757 + notifyListeners(externalEvent);
760 - notifyListeners(externalEvent);
761 - }
762 } 758 }
763 - } catch (Exception e) {
764 - log.warn("Exception thrown handling put", e);
765 } 759 }
766 - }); 760 + } catch (Exception e) {
761 + log.warn("Exception thrown handling put", e);
762 + }
767 } 763 }
768 } 764 }
769 765
...@@ -773,25 +769,22 @@ public class EventuallyConsistentMapImpl<K, V> ...@@ -773,25 +769,22 @@ public class EventuallyConsistentMapImpl<K, V>
773 public void handle(ClusterMessage message) { 769 public void handle(ClusterMessage message) {
774 log.debug("Received remove event from peer: {}", message.sender()); 770 log.debug("Received remove event from peer: {}", message.sender());
775 InternalRemoveEvent<K> event = serializer.decode(message.payload()); 771 InternalRemoveEvent<K> event = serializer.decode(message.payload());
772 + try {
773 + for (RemoveEntry<K> entry : event.entries()) {
774 + K key = entry.key();
775 + Timestamp timestamp = entry.timestamp();
776 776
777 - executor.submit(() -> { 777 + if (removeInternal(key, timestamp)) {
778 - try { 778 + EventuallyConsistentMapEvent<K, V> externalEvent
779 - for (RemoveEntry<K> entry : event.entries()) { 779 + = new EventuallyConsistentMapEvent<>(
780 - K key = entry.key(); 780 + EventuallyConsistentMapEvent.Type.REMOVE,
781 - Timestamp timestamp = entry.timestamp(); 781 + key, null);
782 - 782 + notifyListeners(externalEvent);
783 - if (removeInternal(key, timestamp)) {
784 - EventuallyConsistentMapEvent<K, V> externalEvent
785 - = new EventuallyConsistentMapEvent<>(
786 - EventuallyConsistentMapEvent.Type.REMOVE,
787 - key, null);
788 - notifyListeners(externalEvent);
789 - }
790 } 783 }
791 - } catch (Exception e) {
792 - log.warn("Exception thrown handling remove", e);
793 } 784 }
794 - }); 785 + } catch (Exception e) {
786 + log.warn("Exception thrown handling remove", e);
787 + }
795 } 788 }
796 } 789 }
797 790
......
...@@ -107,6 +107,9 @@ public class DistributedFlowRuleStore ...@@ -107,6 +107,9 @@ public class DistributedFlowRuleStore
107 107
108 private final Logger log = getLogger(getClass()); 108 private final Logger log = getLogger(getClass());
109 109
110 + // TODO: Make configurable.
111 + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
112 +
110 private InternalFlowTable flowTable = new InternalFlowTable(); 113 private InternalFlowTable flowTable = new InternalFlowTable();
111 114
112 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>> 115 /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
...@@ -132,6 +135,7 @@ public class DistributedFlowRuleStore ...@@ -132,6 +135,7 @@ public class DistributedFlowRuleStore
132 // Cache of SMaps used for backup data. each SMap contain device flow table 135 // Cache of SMaps used for backup data. each SMap contain device flow table
133 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps; 136 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
134 137
138 + private ExecutorService messageHandlingExecutor;
135 139
136 private final ExecutorService backupExecutors = 140 private final ExecutorService backupExecutors =
137 Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups")); 141 Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
...@@ -172,7 +176,11 @@ public class DistributedFlowRuleStore ...@@ -172,7 +176,11 @@ public class DistributedFlowRuleStore
172 176
173 final NodeId local = clusterService.getLocalNode().id(); 177 final NodeId local = clusterService.getLocalNode().id();
174 178
175 - clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local)); 179 + messageHandlingExecutor = Executors.newFixedThreadPool(
180 + MESSAGE_HANDLER_THREAD_POOL_SIZE,
181 + groupedThreads("onos/flow", "message-handlers"));
182 +
183 + clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), messageHandlingExecutor);
176 184
177 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() { 185 clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
178 @Override 186 @Override
...@@ -181,7 +189,7 @@ public class DistributedFlowRuleStore ...@@ -181,7 +189,7 @@ public class DistributedFlowRuleStore
181 log.trace("received completed notification for {}", event); 189 log.trace("received completed notification for {}", event);
182 notifyDelegate(event); 190 notifyDelegate(event);
183 } 191 }
184 - }); 192 + }, messageHandlingExecutor);
185 193
186 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() { 194 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
187 195
...@@ -196,7 +204,7 @@ public class DistributedFlowRuleStore ...@@ -196,7 +204,7 @@ public class DistributedFlowRuleStore
196 log.error("Failed to respond back", e); 204 log.error("Failed to respond back", e);
197 } 205 }
198 } 206 }
199 - }); 207 + }, messageHandlingExecutor);
200 208
201 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() { 209 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
202 210
...@@ -211,7 +219,7 @@ public class DistributedFlowRuleStore ...@@ -211,7 +219,7 @@ public class DistributedFlowRuleStore
211 log.error("Failed to respond to peer's getFlowEntries request", e); 219 log.error("Failed to respond to peer's getFlowEntries request", e);
212 } 220 }
213 } 221 }
214 - }); 222 + }, messageHandlingExecutor);
215 223
216 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() { 224 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
217 225
...@@ -226,7 +234,7 @@ public class DistributedFlowRuleStore ...@@ -226,7 +234,7 @@ public class DistributedFlowRuleStore
226 log.error("Failed to respond back", e); 234 log.error("Failed to respond back", e);
227 } 235 }
228 } 236 }
229 - }); 237 + }, messageHandlingExecutor);
230 238
231 replicaInfoEventListener = new InternalReplicaInfoEventListener(); 239 replicaInfoEventListener = new InternalReplicaInfoEventListener();
232 240
...@@ -242,6 +250,7 @@ public class DistributedFlowRuleStore ...@@ -242,6 +250,7 @@ public class DistributedFlowRuleStore
242 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY); 250 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
243 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS); 251 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
244 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED); 252 clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
253 + messageHandlingExecutor.shutdown();
245 replicaInfoManager.removeListener(replicaInfoEventListener); 254 replicaInfoManager.removeListener(replicaInfoEventListener);
246 log.info("Stopped"); 255 log.info("Stopped");
247 } 256 }
...@@ -421,7 +430,7 @@ public class DistributedFlowRuleStore ...@@ -421,7 +430,7 @@ public class DistributedFlowRuleStore
421 switch (op.operator()) { 430 switch (op.operator()) {
422 case ADD: 431 case ADD:
423 entry = new DefaultFlowEntry(op.target()); 432 entry = new DefaultFlowEntry(op.target());
424 - // always add requested FlowRule 433 + // always add requested FlowRule
425 // Note: 2 equal FlowEntry may have different treatment 434 // Note: 2 equal FlowEntry may have different treatment
426 flowTable.remove(entry.deviceId(), entry); 435 flowTable.remove(entry.deviceId(), entry);
427 flowTable.add(entry); 436 flowTable.add(entry);
......
...@@ -78,6 +78,9 @@ public class DefaultFlowRuleExtRouter ...@@ -78,6 +78,9 @@ public class DefaultFlowRuleExtRouter
78 78
79 private final Logger log = getLogger(getClass()); 79 private final Logger log = getLogger(getClass());
80 80
81 + // TODO: Make configurable.
82 + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
83 +
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected ReplicaInfoService replicaInfoManager; 85 protected ReplicaInfoService replicaInfoManager;
83 86
...@@ -102,6 +105,8 @@ public class DefaultFlowRuleExtRouter ...@@ -102,6 +105,8 @@ public class DefaultFlowRuleExtRouter
102 private final ExecutorService futureListeners = Executors 105 private final ExecutorService futureListeners = Executors
103 .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders")); 106 .newCachedThreadPool(groupedThreads("onos/flow", "store-peer-responders"));
104 107
108 + private ExecutorService messageHandlingExecutor;
109 +
105 protected static final StoreSerializer SERIALIZER = new KryoSerializer() { 110 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
106 @Override 111 @Override
107 protected void setupKryoPool() { 112 protected void setupKryoPool() {
...@@ -120,6 +125,11 @@ public class DefaultFlowRuleExtRouter ...@@ -120,6 +125,11 @@ public class DefaultFlowRuleExtRouter
120 125
121 @Activate 126 @Activate
122 public void activate() { 127 public void activate() {
128 +
129 + messageHandlingExecutor = Executors.newFixedThreadPool(
130 + MESSAGE_HANDLER_THREAD_POOL_SIZE,
131 + groupedThreads("onos/flow", "message-handlers"));
132 +
123 clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS, 133 clusterCommunicator.addSubscriber(APPLY_EXTEND_FLOWS,
124 new ClusterMessageHandler() { 134 new ClusterMessageHandler() {
125 135
...@@ -141,7 +151,7 @@ public class DefaultFlowRuleExtRouter ...@@ -141,7 +151,7 @@ public class DefaultFlowRuleExtRouter
141 } 151 }
142 }, futureListeners); 152 }, futureListeners);
143 } 153 }
144 - }); 154 + }, messageHandlingExecutor);
145 155
146 replicaInfoManager.addListener(replicaInfoEventListener); 156 replicaInfoManager.addListener(replicaInfoEventListener);
147 157
...@@ -151,6 +161,7 @@ public class DefaultFlowRuleExtRouter ...@@ -151,6 +161,7 @@ public class DefaultFlowRuleExtRouter
151 @Deactivate 161 @Deactivate
152 public void deactivate() { 162 public void deactivate() {
153 clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS); 163 clusterCommunicator.removeSubscriber(APPLY_EXTEND_FLOWS);
164 + messageHandlingExecutor.shutdown();
154 replicaInfoManager.removeListener(replicaInfoEventListener); 165 replicaInfoManager.removeListener(replicaInfoEventListener);
155 log.info("Stopped"); 166 log.info("Stopped");
156 } 167 }
......
...@@ -154,20 +154,21 @@ public class GossipHostStore ...@@ -154,20 +154,21 @@ public class GossipHostStore
154 154
155 @Activate 155 @Activate
156 public void activate() { 156 public void activate() {
157 +
158 + executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
159 +
160 + backgroundExecutor =
161 + newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
162 +
157 clusterCommunicator.addSubscriber( 163 clusterCommunicator.addSubscriber(
158 HOST_UPDATED_MSG, 164 HOST_UPDATED_MSG,
159 - new InternalHostEventListener()); 165 + new InternalHostEventListener(), executor);
160 clusterCommunicator.addSubscriber( 166 clusterCommunicator.addSubscriber(
161 HOST_REMOVED_MSG, 167 HOST_REMOVED_MSG,
162 - new InternalHostRemovedEventListener()); 168 + new InternalHostRemovedEventListener(), executor);
163 clusterCommunicator.addSubscriber( 169 clusterCommunicator.addSubscriber(
164 HOST_ANTI_ENTROPY_ADVERTISEMENT, 170 HOST_ANTI_ENTROPY_ADVERTISEMENT,
165 - new InternalHostAntiEntropyAdvertisementListener()); 171 + new InternalHostAntiEntropyAdvertisementListener(), backgroundExecutor);
166 -
167 - executor = newCachedThreadPool(groupedThreads("onos/host", "fg-%d"));
168 -
169 - backgroundExecutor =
170 - newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/host", "bg-%d")));
171 172
172 // start anti-entropy thread 173 // start anti-entropy thread
173 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), 174 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
...@@ -512,20 +513,14 @@ public class GossipHostStore ...@@ -512,20 +513,14 @@ public class GossipHostStore
512 HostDescription hostDescription = event.hostDescription(); 513 HostDescription hostDescription = event.hostDescription();
513 Timestamp timestamp = event.timestamp(); 514 Timestamp timestamp = event.timestamp();
514 515
515 - executor.submit(new Runnable() { 516 + try {
516 - 517 + notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId,
517 - @Override 518 + hostId,
518 - public void run() { 519 + hostDescription,
519 - try { 520 + timestamp));
520 - notifyDelegateIfNotNull(createOrUpdateHostInternal(providerId, 521 + } catch (Exception e) {
521 - hostId, 522 + log.warn("Exception thrown handling host removed", e);
522 - hostDescription, 523 + }
523 - timestamp));
524 - } catch (Exception e) {
525 - log.warn("Exception thrown handling host removed", e);
526 - }
527 - }
528 - });
529 } 524 }
530 } 525 }
531 526
...@@ -540,17 +535,11 @@ public class GossipHostStore ...@@ -540,17 +535,11 @@ public class GossipHostStore
540 HostId hostId = event.hostId(); 535 HostId hostId = event.hostId();
541 Timestamp timestamp = event.timestamp(); 536 Timestamp timestamp = event.timestamp();
542 537
543 - executor.submit(new Runnable() { 538 + try {
544 - 539 + notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
545 - @Override 540 + } catch (Exception e) {
546 - public void run() { 541 + log.warn("Exception thrown handling host removed", e);
547 - try { 542 + }
548 - notifyDelegateIfNotNull(removeHostInternal(hostId, timestamp));
549 - } catch (Exception e) {
550 - log.warn("Exception thrown handling host removed", e);
551 - }
552 - }
553 - });
554 } 543 }
555 } 544 }
556 545
...@@ -720,17 +709,11 @@ public class GossipHostStore ...@@ -720,17 +709,11 @@ public class GossipHostStore
720 public void handle(ClusterMessage message) { 709 public void handle(ClusterMessage message) {
721 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender()); 710 log.trace("Received Host Anti-Entropy advertisement from peer: {}", message.sender());
722 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); 711 HostAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
723 - backgroundExecutor.submit(new Runnable() { 712 + try {
724 - 713 + handleAntiEntropyAdvertisement(advertisement);
725 - @Override 714 + } catch (Exception e) {
726 - public void run() { 715 + log.warn("Exception thrown handling Host advertisements", e);
727 - try { 716 + }
728 - handleAntiEntropyAdvertisement(advertisement);
729 - } catch (Exception e) {
730 - log.warn("Exception thrown handling Host advertisements", e);
731 - }
732 - }
733 - });
734 } 717 }
735 } 718 }
736 } 719 }
......
...@@ -158,23 +158,23 @@ public class GossipLinkStore ...@@ -158,23 +158,23 @@ public class GossipLinkStore
158 @Activate 158 @Activate
159 public void activate() { 159 public void activate() {
160 160
161 + executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
162 +
163 + backgroundExecutors =
164 + newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
165 +
161 clusterCommunicator.addSubscriber( 166 clusterCommunicator.addSubscriber(
162 GossipLinkStoreMessageSubjects.LINK_UPDATE, 167 GossipLinkStoreMessageSubjects.LINK_UPDATE,
163 - new InternalLinkEventListener()); 168 + new InternalLinkEventListener(), executor);
164 clusterCommunicator.addSubscriber( 169 clusterCommunicator.addSubscriber(
165 GossipLinkStoreMessageSubjects.LINK_REMOVED, 170 GossipLinkStoreMessageSubjects.LINK_REMOVED,
166 - new InternalLinkRemovedEventListener()); 171 + new InternalLinkRemovedEventListener(), executor);
167 clusterCommunicator.addSubscriber( 172 clusterCommunicator.addSubscriber(
168 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT, 173 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
169 - new InternalLinkAntiEntropyAdvertisementListener()); 174 + new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
170 clusterCommunicator.addSubscriber( 175 clusterCommunicator.addSubscriber(
171 GossipLinkStoreMessageSubjects.LINK_INJECTED, 176 GossipLinkStoreMessageSubjects.LINK_INJECTED,
172 - new LinkInjectedEventListener()); 177 + new LinkInjectedEventListener(), executor);
173 -
174 - executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
175 -
176 - backgroundExecutors =
177 - newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
178 178
179 long initialDelaySec = 5; 179 long initialDelaySec = 5;
180 long periodSec = 5; 180 long periodSec = 5;
...@@ -822,17 +822,11 @@ public class GossipLinkStore ...@@ -822,17 +822,11 @@ public class GossipLinkStore
822 ProviderId providerId = event.providerId(); 822 ProviderId providerId = event.providerId();
823 Timestamped<LinkDescription> linkDescription = event.linkDescription(); 823 Timestamped<LinkDescription> linkDescription = event.linkDescription();
824 824
825 - executor.submit(new Runnable() { 825 + try {
826 - 826 + notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
827 - @Override 827 + } catch (Exception e) {
828 - public void run() { 828 + log.warn("Exception thrown handling link event", e);
829 - try { 829 + }
830 - notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
831 - } catch (Exception e) {
832 - log.warn("Exception thrown handling link event", e);
833 - }
834 - }
835 - });
836 } 830 }
837 } 831 }
838 832
...@@ -847,17 +841,11 @@ public class GossipLinkStore ...@@ -847,17 +841,11 @@ public class GossipLinkStore
847 LinkKey linkKey = event.linkKey(); 841 LinkKey linkKey = event.linkKey();
848 Timestamp timestamp = event.timestamp(); 842 Timestamp timestamp = event.timestamp();
849 843
850 - executor.submit(new Runnable() { 844 + try {
851 - 845 + notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
852 - @Override 846 + } catch (Exception e) {
853 - public void run() { 847 + log.warn("Exception thrown handling link removed", e);
854 - try { 848 + }
855 - notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
856 - } catch (Exception e) {
857 - log.warn("Exception thrown handling link removed", e);
858 - }
859 - }
860 - });
861 } 849 }
862 } 850 }
863 851
...@@ -868,18 +856,12 @@ public class GossipLinkStore ...@@ -868,18 +856,12 @@ public class GossipLinkStore
868 public void handle(ClusterMessage message) { 856 public void handle(ClusterMessage message) {
869 log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender()); 857 log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
870 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload()); 858 LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
871 - backgroundExecutors.submit(new Runnable() { 859 + try {
872 - 860 + handleAntiEntropyAdvertisement(advertisement);
873 - @Override 861 + } catch (Exception e) {
874 - public void run() { 862 + log.warn("Exception thrown while handling Link advertisements", e);
875 - try { 863 + throw e;
876 - handleAntiEntropyAdvertisement(advertisement); 864 + }
877 - } catch (Exception e) {
878 - log.warn("Exception thrown while handling Link advertisements", e);
879 - throw e;
880 - }
881 - }
882 - });
883 } 865 }
884 } 866 }
885 867
...@@ -894,13 +876,11 @@ public class GossipLinkStore ...@@ -894,13 +876,11 @@ public class GossipLinkStore
894 ProviderId providerId = linkInjectedEvent.providerId(); 876 ProviderId providerId = linkInjectedEvent.providerId();
895 LinkDescription linkDescription = linkInjectedEvent.linkDescription(); 877 LinkDescription linkDescription = linkInjectedEvent.linkDescription();
896 878
897 - executor.submit(new Runnable() { 879 + try {
898 - 880 + createOrUpdateLink(providerId, linkDescription);
899 - @Override 881 + } catch (Exception e) {
900 - public void run() { 882 + log.warn("Exception thrown while handling link injected event", e);
901 - createOrUpdateLink(providerId, linkDescription); 883 + }
902 - }
903 - });
904 } 884 }
905 } 885 }
906 } 886 }
......
...@@ -15,9 +15,12 @@ ...@@ -15,9 +15,12 @@
15 */ 15 */
16 package org.onosproject.store.packet.impl; 16 package org.onosproject.store.packet.impl;
17 17
18 +import static org.onlab.util.Tools.groupedThreads;
18 import static org.slf4j.LoggerFactory.getLogger; 19 import static org.slf4j.LoggerFactory.getLogger;
19 20
20 import java.io.IOException; 21 import java.io.IOException;
22 +import java.util.concurrent.ExecutorService;
23 +import java.util.concurrent.Executors;
21 24
22 import org.apache.felix.scr.annotations.Activate; 25 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component; 26 import org.apache.felix.scr.annotations.Component;
...@@ -55,6 +58,9 @@ public class DistributedPacketStore ...@@ -55,6 +58,9 @@ public class DistributedPacketStore
55 58
56 private final Logger log = getLogger(getClass()); 59 private final Logger log = getLogger(getClass());
57 60
61 + // TODO: make this configurable.
62 + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
63 +
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 private MastershipService mastershipService; 65 private MastershipService mastershipService;
60 66
...@@ -77,16 +83,24 @@ public class DistributedPacketStore ...@@ -77,16 +83,24 @@ public class DistributedPacketStore
77 } 83 }
78 }; 84 };
79 85
86 + private ExecutorService messageHandlingExecutor;
87 +
80 @Activate 88 @Activate
81 public void activate() { 89 public void activate() {
82 - log.info("Started"); 90 + messageHandlingExecutor = Executors.newFixedThreadPool(
91 + MESSAGE_HANDLER_THREAD_POOL_SIZE,
92 + groupedThreads("onos/flow", "message-handlers"));
83 93
84 communicationService.addSubscriber( 94 communicationService.addSubscriber(
85 - PACKET_OUT_SUBJECT, new InternalClusterMessageHandler()); 95 + PACKET_OUT_SUBJECT, new InternalClusterMessageHandler(), messageHandlingExecutor);
96 +
97 + log.info("Started");
86 } 98 }
87 99
88 @Deactivate 100 @Deactivate
89 public void deactivate() { 101 public void deactivate() {
102 + communicationService.removeSubscriber(PACKET_OUT_SUBJECT);
103 + messageHandlingExecutor.shutdown();
90 log.info("Stopped"); 104 log.info("Stopped");
91 } 105 }
92 106
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.statistic.impl; 16 package org.onosproject.store.statistic.impl;
17 17
18 import com.google.common.collect.Sets; 18 import com.google.common.collect.Sets;
19 +
19 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
20 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
21 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -48,11 +49,14 @@ import java.util.Map; ...@@ -48,11 +49,14 @@ import java.util.Map;
48 import java.util.Set; 49 import java.util.Set;
49 import java.util.concurrent.ConcurrentHashMap; 50 import java.util.concurrent.ConcurrentHashMap;
50 import java.util.concurrent.ExecutionException; 51 import java.util.concurrent.ExecutionException;
52 +import java.util.concurrent.ExecutorService;
53 +import java.util.concurrent.Executors;
51 import java.util.concurrent.Future; 54 import java.util.concurrent.Future;
52 import java.util.concurrent.TimeUnit; 55 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.TimeoutException; 56 import java.util.concurrent.TimeoutException;
54 import java.util.concurrent.atomic.AtomicInteger; 57 import java.util.concurrent.atomic.AtomicInteger;
55 58
59 +import static org.onlab.util.Tools.groupedThreads;
56 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT; 60 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
57 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS; 61 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
58 import static org.slf4j.LoggerFactory.getLogger; 62 import static org.slf4j.LoggerFactory.getLogger;
...@@ -68,6 +72,9 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -68,6 +72,9 @@ public class DistributedStatisticStore implements StatisticStore {
68 72
69 private final Logger log = getLogger(getClass()); 73 private final Logger log = getLogger(getClass());
70 74
75 + // TODO: Make configurable.
76 + private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
77 +
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected ReplicaInfoService replicaInfoManager; 79 protected ReplicaInfoService replicaInfoManager;
73 80
...@@ -97,10 +104,17 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -97,10 +104,17 @@ public class DistributedStatisticStore implements StatisticStore {
97 } 104 }
98 };; 105 };;
99 106
107 + private ExecutorService messageHandlingExecutor;
108 +
100 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000; 109 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
101 110
102 @Activate 111 @Activate
103 public void activate() { 112 public void activate() {
113 +
114 + messageHandlingExecutor = Executors.newFixedThreadPool(
115 + MESSAGE_HANDLER_THREAD_POOL_SIZE,
116 + groupedThreads("onos/store/statistic", "message-handlers"));
117 +
104 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() { 118 clusterCommunicator.addSubscriber(GET_CURRENT, new ClusterMessageHandler() {
105 119
106 @Override 120 @Override
...@@ -112,7 +126,7 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -112,7 +126,7 @@ public class DistributedStatisticStore implements StatisticStore {
112 log.error("Failed to respond back", e); 126 log.error("Failed to respond back", e);
113 } 127 }
114 } 128 }
115 - }); 129 + }, messageHandlingExecutor);
116 130
117 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() { 131 clusterCommunicator.addSubscriber(GET_PREVIOUS, new ClusterMessageHandler() {
118 132
...@@ -125,12 +139,15 @@ public class DistributedStatisticStore implements StatisticStore { ...@@ -125,12 +139,15 @@ public class DistributedStatisticStore implements StatisticStore {
125 log.error("Failed to respond back", e); 139 log.error("Failed to respond back", e);
126 } 140 }
127 } 141 }
128 - }); 142 + }, messageHandlingExecutor);
129 log.info("Started"); 143 log.info("Started");
130 } 144 }
131 145
132 @Deactivate 146 @Deactivate
133 public void deactivate() { 147 public void deactivate() {
148 + clusterCommunicator.removeSubscriber(GET_PREVIOUS);
149 + clusterCommunicator.removeSubscriber(GET_CURRENT);
150 + messageHandlingExecutor.shutdown();
134 log.info("Stopped"); 151 log.info("Stopped");
135 } 152 }
136 153
......
...@@ -17,6 +17,7 @@ package org.onosproject.store.device.impl; ...@@ -17,6 +17,7 @@ package org.onosproject.store.device.impl;
17 17
18 import com.google.common.collect.Iterables; 18 import com.google.common.collect.Iterables;
19 import com.google.common.collect.Sets; 19 import com.google.common.collect.Sets;
20 +
20 import org.easymock.Capture; 21 import org.easymock.Capture;
21 import org.junit.After; 22 import org.junit.After;
22 import org.junit.AfterClass; 23 import org.junit.AfterClass;
...@@ -62,6 +63,7 @@ import java.util.List; ...@@ -62,6 +63,7 @@ import java.util.List;
62 import java.util.Map; 63 import java.util.Map;
63 import java.util.Set; 64 import java.util.Set;
64 import java.util.concurrent.CountDownLatch; 65 import java.util.concurrent.CountDownLatch;
66 +import java.util.concurrent.ExecutorService;
65 import java.util.concurrent.TimeUnit; 67 import java.util.concurrent.TimeUnit;
66 68
67 import static java.util.Arrays.asList; 69 import static java.util.Arrays.asList;
...@@ -152,7 +154,7 @@ public class GossipDeviceStoreTest { ...@@ -152,7 +154,7 @@ public class GossipDeviceStoreTest {
152 154
153 clusterCommunicator = createNiceMock(ClusterCommunicationService.class); 155 clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
154 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class), 156 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
155 - anyObject(ClusterMessageHandler.class)); 157 + anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
156 expectLastCall().anyTimes(); 158 expectLastCall().anyTimes();
157 replay(clusterCommunicator); 159 replay(clusterCommunicator);
158 ClusterService clusterService = new TestClusterService(); 160 ClusterService clusterService = new TestClusterService();
......
...@@ -46,6 +46,7 @@ import java.util.Map; ...@@ -46,6 +46,7 @@ import java.util.Map;
46 import java.util.Objects; 46 import java.util.Objects;
47 import java.util.Set; 47 import java.util.Set;
48 import java.util.concurrent.CountDownLatch; 48 import java.util.concurrent.CountDownLatch;
49 +import java.util.concurrent.ExecutorService;
49 import java.util.concurrent.TimeUnit; 50 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.atomic.AtomicLong; 51 import java.util.concurrent.atomic.AtomicLong;
51 52
...@@ -129,7 +130,7 @@ public class EventuallyConsistentMapImplTest { ...@@ -129,7 +130,7 @@ public class EventuallyConsistentMapImplTest {
129 // allows us to get a reference to the map's internal cluster message 130 // allows us to get a reference to the map's internal cluster message
130 // handlers so we can induce events coming in from a peer. 131 // handlers so we can induce events coming in from a peer.
131 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class), 132 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
132 - anyObject(ClusterMessageHandler.class)); 133 + anyObject(ClusterMessageHandler.class), anyObject(ExecutorService.class));
133 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3); 134 expectLastCall().andDelegateTo(new TestClusterCommunicationService()).times(3);
134 135
135 replay(clusterCommunicator); 136 replay(clusterCommunicator);
...@@ -731,6 +732,21 @@ public class EventuallyConsistentMapImplTest { ...@@ -731,6 +732,21 @@ public class EventuallyConsistentMapImplTest {
731 } 732 }
732 733
733 @Override 734 @Override
735 + public void addSubscriber(MessageSubject subject,
736 + ClusterMessageHandler subscriber,
737 + ExecutorService executor) {
738 + if (subject.equals(PUT_MESSAGE_SUBJECT)) {
739 + putHandler = subscriber;
740 + } else if (subject.equals(REMOVE_MESSAGE_SUBJECT)) {
741 + removeHandler = subscriber;
742 + } else if (subject.equals(ANTI_ENTROPY_MESSAGE_SUBJECT)) {
743 + antiEntropyHandler = subscriber;
744 + } else {
745 + throw new RuntimeException("Unexpected message subject " + subject.toString());
746 + }
747 + }
748 +
749 + @Override
734 public void removeSubscriber(MessageSubject subject) {} 750 public void removeSubscriber(MessageSubject subject) {}
735 } 751 }
736 752
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.store.link.impl; 16 package org.onosproject.store.link.impl;
17 17
18 import com.google.common.collect.Iterables; 18 import com.google.common.collect.Iterables;
19 +
19 import org.easymock.Capture; 20 import org.easymock.Capture;
20 import org.junit.After; 21 import org.junit.After;
21 import org.junit.AfterClass; 22 import org.junit.AfterClass;
...@@ -56,6 +57,7 @@ import java.util.HashMap; ...@@ -56,6 +57,7 @@ import java.util.HashMap;
56 import java.util.Map; 57 import java.util.Map;
57 import java.util.Set; 58 import java.util.Set;
58 import java.util.concurrent.CountDownLatch; 59 import java.util.concurrent.CountDownLatch;
60 +import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.TimeUnit; 61 import java.util.concurrent.TimeUnit;
60 62
61 import static org.easymock.EasyMock.*; 63 import static org.easymock.EasyMock.*;
...@@ -140,7 +142,8 @@ public class GossipLinkStoreTest { ...@@ -140,7 +142,8 @@ public class GossipLinkStoreTest {
140 // TODO mock clusterCommunicator 142 // TODO mock clusterCommunicator
141 clusterCommunicator = createNiceMock(ClusterCommunicationService.class); 143 clusterCommunicator = createNiceMock(ClusterCommunicationService.class);
142 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class), 144 clusterCommunicator.addSubscriber(anyObject(MessageSubject.class),
143 - anyObject(ClusterMessageHandler.class)); 145 + anyObject(ClusterMessageHandler.class),
146 + anyObject(ExecutorService.class));
144 expectLastCall().anyTimes(); 147 expectLastCall().anyTimes();
145 replay(clusterCommunicator); 148 replay(clusterCommunicator);
146 149
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onlab.netty; 16 package org.onlab.netty;
17 17
18 import java.io.IOException; 18 import java.io.IOException;
19 +import java.util.concurrent.ExecutorService;
19 20
20 import com.google.common.util.concurrent.ListenableFuture; 21 import com.google.common.util.concurrent.ListenableFuture;
21 22
...@@ -47,7 +48,16 @@ public interface MessagingService { ...@@ -47,7 +48,16 @@ public interface MessagingService {
47 * Registers a new message handler for message type. 48 * Registers a new message handler for message type.
48 * @param type message type. 49 * @param type message type.
49 * @param handler message handler 50 * @param handler message handler
51 + * @param executor executor to use for running message handler logic.
50 */ 52 */
53 + public void registerHandler(String type, MessageHandler handler, ExecutorService executor);
54 +
55 + /**
56 + * Registers a new message handler for message type.
57 + * @param type message type.
58 + * @param handler message handler
59 + */
60 + @Deprecated
51 public void registerHandler(String type, MessageHandler handler); 61 public void registerHandler(String type, MessageHandler handler);
52 62
53 /** 63 /**
......
...@@ -41,6 +41,7 @@ import java.net.InetAddress; ...@@ -41,6 +41,7 @@ import java.net.InetAddress;
41 import java.net.UnknownHostException; 41 import java.net.UnknownHostException;
42 import java.util.concurrent.ConcurrentHashMap; 42 import java.util.concurrent.ConcurrentHashMap;
43 import java.util.concurrent.ConcurrentMap; 43 import java.util.concurrent.ConcurrentMap;
44 +import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.TimeUnit; 45 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException; 46 import java.util.concurrent.TimeoutException;
46 import java.util.concurrent.atomic.AtomicLong; 47 import java.util.concurrent.atomic.AtomicLong;
...@@ -213,6 +214,22 @@ public class NettyMessagingService implements MessagingService { ...@@ -213,6 +214,22 @@ public class NettyMessagingService implements MessagingService {
213 } 214 }
214 215
215 @Override 216 @Override
217 + public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
218 + handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
219 + @Override
220 + public void handle(Message message) throws IOException {
221 + executor.submit(() -> {
222 + try {
223 + handler.handle(message);
224 + } catch (Exception e) {
225 + log.warn("Failed to process message of type {}", type, e);
226 + }
227 + });
228 + }
229 + });
230 + }
231 +
232 + @Override
216 public void unregisterHandler(String type) { 233 public void unregisterHandler(String type) {
217 handlers.remove(type); 234 handlers.remove(type);
218 } 235 }
......