Committed by
Gerrit Code Review
Move event handling to background thread
Change-Id: I8ccd1631fac14b1f753da4fb4b4ed01e5a045edf
Showing
3 changed files
with
30 additions
and
9 deletions
| ... | @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; | ... | @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; |
| 20 | import com.google.common.collect.LinkedListMultimap; | 20 | import com.google.common.collect.LinkedListMultimap; |
| 21 | import com.google.common.collect.Multimap; | 21 | import com.google.common.collect.Multimap; |
| 22 | import com.google.common.collect.Sets; | 22 | import com.google.common.collect.Sets; |
| 23 | + | ||
| 23 | import org.onosproject.core.ApplicationId; | 24 | import org.onosproject.core.ApplicationId; |
| 24 | import org.onosproject.mastership.MastershipService; | 25 | import org.onosproject.mastership.MastershipService; |
| 25 | import org.onosproject.net.Device; | 26 | import org.onosproject.net.Device; |
| ... | @@ -186,16 +187,19 @@ class FlowRuleDriverProvider extends AbstractProvider implements FlowRuleProvide | ... | @@ -186,16 +187,19 @@ class FlowRuleDriverProvider extends AbstractProvider implements FlowRuleProvide |
| 186 | 187 | ||
| 187 | @Override | 188 | @Override |
| 188 | public void event(DeviceEvent event) { | 189 | public void event(DeviceEvent event) { |
| 189 | - executor.schedule(() -> pollDeviceFlowEntries(event.subject()), 0, TimeUnit.SECONDS); | 190 | + executor.execute(() -> handleEvent(event)); |
| 190 | } | 191 | } |
| 191 | 192 | ||
| 192 | - @Override | 193 | + private void handleEvent(DeviceEvent event) { |
| 193 | - public boolean isRelevant(DeviceEvent event) { | ||
| 194 | Device device = event.subject(); | 194 | Device device = event.subject(); |
| 195 | - return mastershipService.isLocalMaster(device.id()) && device.is(FlowRuleProgrammable.class) && | 195 | + boolean isRelevant = mastershipService.isLocalMaster(device.id()) |
| 196 | - (event.type() == DEVICE_ADDED || | 196 | + && device.is(FlowRuleProgrammable.class) |
| 197 | - event.type() == DEVICE_UPDATED || | 197 | + && (event.type() == DEVICE_ADDED || |
| 198 | - (event.type() == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(device.id()))); | 198 | + event.type() == DEVICE_UPDATED || |
| 199 | + (event.type() == DEVICE_AVAILABILITY_CHANGED && deviceService.isAvailable(device.id()))); | ||
| 200 | + if (isRelevant) { | ||
| 201 | + pollDeviceFlowEntries(event.subject()); | ||
| 202 | + } | ||
| 199 | } | 203 | } |
| 200 | } | 204 | } |
| 201 | 205 | ... | ... |
| ... | @@ -163,6 +163,7 @@ public class DistributedFlowRuleStore | ... | @@ -163,6 +163,7 @@ public class DistributedFlowRuleStore |
| 163 | 163 | ||
| 164 | private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap(); | 164 | private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap(); |
| 165 | private ExecutorService messageHandlingExecutor; | 165 | private ExecutorService messageHandlingExecutor; |
| 166 | + private ExecutorService eventHandler; | ||
| 166 | 167 | ||
| 167 | private ScheduledFuture<?> backupTask; | 168 | private ScheduledFuture<?> backupTask; |
| 168 | private final ScheduledExecutorService backupSenderExecutor = | 169 | private final ScheduledExecutorService backupSenderExecutor = |
| ... | @@ -197,6 +198,8 @@ public class DistributedFlowRuleStore | ... | @@ -197,6 +198,8 @@ public class DistributedFlowRuleStore |
| 197 | 198 | ||
| 198 | local = clusterService.getLocalNode().id(); | 199 | local = clusterService.getLocalNode().id(); |
| 199 | 200 | ||
| 201 | + eventHandler = Executors.newSingleThreadExecutor( | ||
| 202 | + groupedThreads("onos/flow", "event-handler", log)); | ||
| 200 | messageHandlingExecutor = Executors.newFixedThreadPool( | 203 | messageHandlingExecutor = Executors.newFixedThreadPool( |
| 201 | msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log)); | 204 | msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers", log)); |
| 202 | 205 | ||
| ... | @@ -233,6 +236,7 @@ public class DistributedFlowRuleStore | ... | @@ -233,6 +236,7 @@ public class DistributedFlowRuleStore |
| 233 | unregisterMessageHandlers(); | 236 | unregisterMessageHandlers(); |
| 234 | deviceTableStats.removeListener(tableStatsListener); | 237 | deviceTableStats.removeListener(tableStatsListener); |
| 235 | deviceTableStats.destroy(); | 238 | deviceTableStats.destroy(); |
| 239 | + eventHandler.shutdownNow(); | ||
| 236 | messageHandlingExecutor.shutdownNow(); | 240 | messageHandlingExecutor.shutdownNow(); |
| 237 | backupSenderExecutor.shutdownNow(); | 241 | backupSenderExecutor.shutdownNow(); |
| 238 | log.info("Stopped"); | 242 | log.info("Stopped"); |
| ... | @@ -663,6 +667,10 @@ public class DistributedFlowRuleStore | ... | @@ -663,6 +667,10 @@ public class DistributedFlowRuleStore |
| 663 | 667 | ||
| 664 | @Override | 668 | @Override |
| 665 | public void event(ReplicaInfoEvent event) { | 669 | public void event(ReplicaInfoEvent event) { |
| 670 | + eventHandler.execute(() -> handleEvent(event)); | ||
| 671 | + } | ||
| 672 | + | ||
| 673 | + private void handleEvent(ReplicaInfoEvent event) { | ||
| 666 | if (!backupEnabled) { | 674 | if (!backupEnabled) { |
| 667 | return; | 675 | return; |
| 668 | } | 676 | } | ... | ... |
| ... | @@ -97,6 +97,7 @@ public class ConsistentDeviceMastershipStore | ... | @@ -97,6 +97,7 @@ public class ConsistentDeviceMastershipStore |
| 97 | private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = | 97 | private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN = |
| 98 | Pattern.compile("device:(.*)"); | 98 | Pattern.compile("device:(.*)"); |
| 99 | 99 | ||
| 100 | + private ExecutorService eventHandler; | ||
| 100 | private ExecutorService messageHandlingExecutor; | 101 | private ExecutorService messageHandlingExecutor; |
| 101 | private ScheduledExecutorService transferExecutor; | 102 | private ScheduledExecutorService transferExecutor; |
| 102 | private final LeadershipEventListener leadershipEventListener = | 103 | private final LeadershipEventListener leadershipEventListener = |
| ... | @@ -116,6 +117,10 @@ public class ConsistentDeviceMastershipStore | ... | @@ -116,6 +117,10 @@ public class ConsistentDeviceMastershipStore |
| 116 | 117 | ||
| 117 | @Activate | 118 | @Activate |
| 118 | public void activate() { | 119 | public void activate() { |
| 120 | + | ||
| 121 | + eventHandler = Executors.newSingleThreadExecutor( | ||
| 122 | + groupedThreads("onos/store/device/mastership", "event-handler", log)); | ||
| 123 | + | ||
| 119 | messageHandlingExecutor = | 124 | messageHandlingExecutor = |
| 120 | Executors.newSingleThreadExecutor( | 125 | Executors.newSingleThreadExecutor( |
| 121 | groupedThreads("onos/store/device/mastership", "message-handler", log)); | 126 | groupedThreads("onos/store/device/mastership", "message-handler", log)); |
| ... | @@ -136,10 +141,10 @@ public class ConsistentDeviceMastershipStore | ... | @@ -136,10 +141,10 @@ public class ConsistentDeviceMastershipStore |
| 136 | @Deactivate | 141 | @Deactivate |
| 137 | public void deactivate() { | 142 | public void deactivate() { |
| 138 | clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT); | 143 | clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT); |
| 144 | + leadershipService.removeListener(leadershipEventListener); | ||
| 139 | messageHandlingExecutor.shutdown(); | 145 | messageHandlingExecutor.shutdown(); |
| 140 | transferExecutor.shutdown(); | 146 | transferExecutor.shutdown(); |
| 141 | - leadershipService.removeListener(leadershipEventListener); | 147 | + eventHandler.shutdown(); |
| 142 | - | ||
| 143 | log.info("Stopped"); | 148 | log.info("Stopped"); |
| 144 | } | 149 | } |
| 145 | 150 | ||
| ... | @@ -308,6 +313,10 @@ public class ConsistentDeviceMastershipStore | ... | @@ -308,6 +313,10 @@ public class ConsistentDeviceMastershipStore |
| 308 | 313 | ||
| 309 | @Override | 314 | @Override |
| 310 | public void event(LeadershipEvent event) { | 315 | public void event(LeadershipEvent event) { |
| 316 | + eventHandler.execute(() -> handleEvent(event)); | ||
| 317 | + } | ||
| 318 | + | ||
| 319 | + private void handleEvent(LeadershipEvent event) { | ||
| 311 | Leadership leadership = event.subject(); | 320 | Leadership leadership = event.subject(); |
| 312 | DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic()); | 321 | DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic()); |
| 313 | RoleInfo roleInfo = getNodes(deviceId); | 322 | RoleInfo roleInfo = getNodes(deviceId); | ... | ... |
-
Please register or login to post a comment