Committed by
Gerrit Code Review
cherry picking the fix for ONOS-4754 to onos-1.6 and master
Change-Id: I2c7da62479566f16034b598029df5f98a37cc99e
Showing
2 changed files
with
26 additions
and
5 deletions
... | @@ -120,6 +120,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { | ... | @@ -120,6 +120,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { |
120 | private final ExecutorService executorBarrier = | 120 | private final ExecutorService executorBarrier = |
121 | Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log)); | 121 | Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log)); |
122 | 122 | ||
123 | + //Separate executor thread for handling error messages and barrier replies for same failed | ||
124 | + // transactions to avoid context switching of thread | ||
125 | + protected ExecutorService executorErrorMsgs = | ||
126 | + Executors.newSingleThreadExecutor(groupedThreads("onos/of", "event-error-msg-%d", log)); | ||
127 | + | ||
128 | + //concurrent hashmap to track failed transactions | ||
129 | + protected ConcurrentMap<Long, Boolean> errorMsgs = | ||
130 | + new ConcurrentHashMap<>(); | ||
123 | protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches = | 131 | protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches = |
124 | new ConcurrentHashMap<>(); | 132 | new ConcurrentHashMap<>(); |
125 | protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches = | 133 | protected ConcurrentMap<Dpid, OpenFlowSwitch> activeMasterSwitches = |
... | @@ -294,9 +302,13 @@ public class OpenFlowControllerImpl implements OpenFlowController { | ... | @@ -294,9 +302,13 @@ public class OpenFlowControllerImpl implements OpenFlowController { |
294 | // TODO: Consider using separate threadpool for sensitive messages. | 302 | // TODO: Consider using separate threadpool for sensitive messages. |
295 | // ie. Back to back error could cause us to starve. | 303 | // ie. Back to back error could cause us to starve. |
296 | case FLOW_REMOVED: | 304 | case FLOW_REMOVED: |
297 | - case ERROR: | ||
298 | executorMsgs.execute(new OFMessageHandler(dpid, msg)); | 305 | executorMsgs.execute(new OFMessageHandler(dpid, msg)); |
299 | break; | 306 | break; |
307 | + case ERROR: | ||
308 | + log.debug("Received error message from {}: {}", dpid, msg); | ||
309 | + errorMsgs.putIfAbsent(msg.getXid(), true); | ||
310 | + executorErrorMsgs.execute(new OFMessageHandler(dpid, msg)); | ||
311 | + break; | ||
300 | case STATS_REPLY: | 312 | case STATS_REPLY: |
301 | OFStatsReply reply = (OFStatsReply) msg; | 313 | OFStatsReply reply = (OFStatsReply) msg; |
302 | switch (reply.getStatsType()) { | 314 | switch (reply.getStatsType()) { |
... | @@ -403,7 +415,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { | ... | @@ -403,7 +415,14 @@ public class OpenFlowControllerImpl implements OpenFlowController { |
403 | } | 415 | } |
404 | break; | 416 | break; |
405 | case BARRIER_REPLY: | 417 | case BARRIER_REPLY: |
406 | - executorBarrier.execute(new OFMessageHandler(dpid, msg)); | 418 | + if (errorMsgs.containsKey(msg.getXid())) { |
419 | + //To make oferror msg handling and corresponding barrier reply serialized, | ||
420 | + // executorErrorMsgs is used for both transaction | ||
421 | + errorMsgs.remove(msg.getXid()); | ||
422 | + executorErrorMsgs.execute(new OFMessageHandler(dpid, msg)); | ||
423 | + } else { | ||
424 | + executorBarrier.execute(new OFMessageHandler(dpid, msg)); | ||
425 | + } | ||
407 | break; | 426 | break; |
408 | case EXPERIMENTER: | 427 | case EXPERIMENTER: |
409 | long experimenter = ((OFExperimenter) msg).getExperimenter(); | 428 | long experimenter = ((OFExperimenter) msg).getExperimenter(); | ... | ... |
... | @@ -52,7 +52,7 @@ public class OpenFlowControllerImplPacketsTest { | ... | @@ -52,7 +52,7 @@ public class OpenFlowControllerImplPacketsTest { |
52 | OpenFlowSwitchListenerAdapter switchListener; | 52 | OpenFlowSwitchListenerAdapter switchListener; |
53 | TestPacketListener packetListener; | 53 | TestPacketListener packetListener; |
54 | TestExecutorService statsExecutorService; | 54 | TestExecutorService statsExecutorService; |
55 | - | 55 | + TestExecutorService errorMsgExecutorService; |
56 | /** | 56 | /** |
57 | * Mock packet listener that accumulates packets. | 57 | * Mock packet listener that accumulates packets. |
58 | */ | 58 | */ |
... | @@ -109,8 +109,10 @@ public class OpenFlowControllerImplPacketsTest { | ... | @@ -109,8 +109,10 @@ public class OpenFlowControllerImplPacketsTest { |
109 | controller.addPacketListener(100, packetListener); | 109 | controller.addPacketListener(100, packetListener); |
110 | 110 | ||
111 | statsExecutorService = new TestExecutorService(); | 111 | statsExecutorService = new TestExecutorService(); |
112 | + errorMsgExecutorService = new TestExecutorService(); | ||
112 | 113 | ||
113 | controller.executorMsgs = statsExecutorService; | 114 | controller.executorMsgs = statsExecutorService; |
115 | + controller.executorErrorMsgs = errorMsgExecutorService; | ||
114 | } | 116 | } |
115 | 117 | ||
116 | /** | 118 | /** |
... | @@ -147,7 +149,7 @@ public class OpenFlowControllerImplPacketsTest { | ... | @@ -147,7 +149,7 @@ public class OpenFlowControllerImplPacketsTest { |
147 | agent.addConnectedSwitch(dpid1, switch1); | 149 | agent.addConnectedSwitch(dpid1, switch1); |
148 | OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR); | 150 | OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR); |
149 | controller.processPacket(dpid1, errorPacket); | 151 | controller.processPacket(dpid1, errorPacket); |
150 | - assertThat(statsExecutorService.submittedMessages(), hasSize(1)); | 152 | + assertThat(errorMsgExecutorService.submittedMessages(), hasSize(1)); |
151 | - assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket)); | 153 | + assertThat(errorMsgExecutorService.submittedMessages().get(0), is(errorPacket)); |
152 | } | 154 | } |
153 | } | 155 | } | ... | ... |
-
Please register or login to post a comment