Pavlin Radoslavov
Committed by Gerrit Code Review

Use two thread pools for handling the OpenFlow STATS and BARRIER messages.

This fixes a problem where a large number of incoming STATS messages
is practically using all available threds from the pool (16), and
there are no available threads to handle the BARRIER messages.

Change-Id: I1130eb8f3b5a17d5d3a7825f32da68eacb99569a

fixing other threadpool issues, ie. not using cachedThreadPool

Change-Id: I40ef10e1f704aef779b2a23c0497dfb7992520eb
...@@ -103,7 +103,8 @@ public class FlowRuleManager ...@@ -103,7 +103,8 @@ public class FlowRuleManager
103 103
104 @Activate 104 @Activate
105 public void activate() { 105 public void activate() {
106 - futureService = Executors.newCachedThreadPool(namedThreads("provider-future-listeners-%d")); 106 + futureService =
107 + Executors.newFixedThreadPool(32, namedThreads("provider-future-listeners-%d"));
107 store.setDelegate(delegate); 108 store.setDelegate(delegate);
108 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry); 109 eventDispatcher.addSink(FlowRuleEvent.class, listenerRegistry);
109 log.info("Started"); 110 log.info("Started");
......
...@@ -67,8 +67,13 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -67,8 +67,13 @@ public class OpenFlowControllerImpl implements OpenFlowController {
67 private static final Logger log = 67 private static final Logger log =
68 LoggerFactory.getLogger(OpenFlowControllerImpl.class); 68 LoggerFactory.getLogger(OpenFlowControllerImpl.class);
69 69
70 - private final ExecutorService executor = Executors.newFixedThreadPool(16, 70 + private final ExecutorService executorMsgs =
71 - namedThreads("of-event-%d")); 71 + Executors.newFixedThreadPool(32,
72 + namedThreads("of-event-stats-%d"));
73 +
74 + private final ExecutorService executorBarrier =
75 + Executors.newFixedThreadPool(4,
76 + namedThreads("of-event-barrier-%d"));
72 77
73 protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches = 78 protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
74 new ConcurrentHashMap<Dpid, OpenFlowSwitch>(); 79 new ConcurrentHashMap<Dpid, OpenFlowSwitch>();
...@@ -189,6 +194,12 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -189,6 +194,12 @@ public class OpenFlowControllerImpl implements OpenFlowController {
189 p.handlePacket(pktCtx); 194 p.handlePacket(pktCtx);
190 } 195 }
191 break; 196 break;
197 + // TODO: Consider using separate threadpool for sensitive messages.
198 + // ie. Back to back error could cause us to starve.
199 + case FLOW_REMOVED:
200 + case ERROR:
201 + executorMsgs.submit(new OFMessageHandler(dpid, msg));
202 + break;
192 case STATS_REPLY: 203 case STATS_REPLY:
193 OFStatsReply reply = (OFStatsReply) msg; 204 OFStatsReply reply = (OFStatsReply) msg;
194 if (reply.getStatsType().equals(OFStatsType.PORT_DESC)) { 205 if (reply.getStatsType().equals(OFStatsType.PORT_DESC)) {
...@@ -201,14 +212,11 @@ public class OpenFlowControllerImpl implements OpenFlowController { ...@@ -201,14 +212,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
201 OFFlowStatsReply.Builder rep = 212 OFFlowStatsReply.Builder rep =
202 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply(); 213 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
203 rep.setEntries(Lists.newLinkedList(stats)); 214 rep.setEntries(Lists.newLinkedList(stats));
204 - executor.submit(new OFMessageHandler(dpid, rep.build())); 215 + executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
205 } 216 }
206 break; 217 break;
207 -
208 - case FLOW_REMOVED:
209 - case ERROR:
210 case BARRIER_REPLY: 218 case BARRIER_REPLY:
211 - executor.submit(new OFMessageHandler(dpid, msg)); 219 + executorBarrier.submit(new OFMessageHandler(dpid, msg));
212 break; 220 break;
213 case EXPERIMENTER: 221 case EXPERIMENTER:
214 // Handle optical port stats 222 // Handle optical port stats
......
...@@ -329,7 +329,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr ...@@ -329,7 +329,7 @@ public class OpenFlowRuleProvider extends AbstractProvider implements FlowRulePr
329 // Do nothing here for now. 329 // Do nothing here for now.
330 } 330 }
331 331
332 - private synchronized void pushFlowMetrics(Dpid dpid, OFStatsReply stats) { 332 + private void pushFlowMetrics(Dpid dpid, OFStatsReply stats) {
333 333
334 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid)); 334 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
335 final OFFlowStatsReply replies = (OFFlowStatsReply) stats; 335 final OFFlowStatsReply replies = (OFFlowStatsReply) stats;
......