Jian Li
Committed by Gerrit Code Review

[ONOS-4385] Handle flow removed message type in all cases

- Do not handle packet in messages inside ControllerImpl class

Change-Id: Idcb26b277b790125bd6b3ba8f10bb4c60e2a3c58
......@@ -48,8 +48,6 @@ import org.projectfloodlight.openflow.protocol.OFExperimenter;
import org.projectfloodlight.openflow.protocol.OFFactories;
import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
import org.projectfloodlight.openflow.protocol.OFTableStatsEntry;
import org.projectfloodlight.openflow.protocol.OFTableStatsReply;
import org.projectfloodlight.openflow.protocol.OFGroupDescStatsEntry;
import org.projectfloodlight.openflow.protocol.OFGroupDescStatsReply;
import org.projectfloodlight.openflow.protocol.OFGroupStatsEntry;
......@@ -62,6 +60,8 @@ import org.projectfloodlight.openflow.protocol.OFPortStatsReply;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.projectfloodlight.openflow.protocol.OFStatsReply;
import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
import org.projectfloodlight.openflow.protocol.OFTableStatsEntry;
import org.projectfloodlight.openflow.protocol.OFTableStatsReply;
import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
import org.slf4j.Logger;
......@@ -79,6 +79,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.onlab.util.Tools.groupedThreads;
@Component(immediate = true)
......@@ -115,12 +116,6 @@ public class OpenFlowControllerImpl implements OpenFlowController {
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
protected ExecutorService executorPacketIn =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d", log));
protected ExecutorService executorFlowRemoved =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d", log));
private final ExecutorService executorBarrier =
Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log));
......@@ -286,19 +281,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
for (PacketListener p : ofPacketListener.values()) {
p.handlePacket(pktCtx);
}
if (monitorAllEvents) {
executorPacketIn.execute(new OFMessageHandler(dpid, msg));
}
break;
// TODO: Consider using separate threadpool for sensitive messages.
// ie. Back to back error could cause us to starve.
case FLOW_REMOVED:
if (monitorAllEvents) {
executorFlowRemoved.execute(new OFMessageHandler(dpid, msg));
}
break;
case ERROR:
log.debug("Received error message from {}: {}", dpid, msg);
executorMsgs.execute(new OFMessageHandler(dpid, msg));
break;
case STATS_REPLY:
......@@ -648,7 +635,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
}
/**
* OpenFlow message handler for incoming control messages.
* OpenFlow message handler.
*/
protected final class OFMessageHandler implements Runnable {
......
......@@ -19,7 +19,6 @@ import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.ExecutorServiceAdapter;
import org.onosproject.openflow.MockOfFeaturesReply;
import org.onosproject.openflow.MockOfPacketIn;
import org.onosproject.openflow.MockOfPortStatus;
import org.onosproject.openflow.OfMessageAdapter;
import org.onosproject.openflow.OpenFlowSwitchListenerAdapter;
......@@ -38,7 +37,9 @@ import java.util.List;
import static junit.framework.TestCase.fail;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
/**
* Tests for packet processing in the open flow controller impl class.
......@@ -51,8 +52,6 @@ public class OpenFlowControllerImplPacketsTest {
OpenFlowSwitchListenerAdapter switchListener;
TestPacketListener packetListener;
TestExecutorService statsExecutorService;
TestExecutorService pktInExecutorService;
TestExecutorService flowRmvExecutorService;
/**
* Mock packet listener that accumulates packets.
......@@ -70,7 +69,6 @@ public class OpenFlowControllerImplPacketsTest {
}
}
/**
* Mock executor service that tracks submits.
*/
......@@ -112,13 +110,8 @@ public class OpenFlowControllerImplPacketsTest {
controller.addPacketListener(100, packetListener);
statsExecutorService = new TestExecutorService();
pktInExecutorService = new TestExecutorService();
flowRmvExecutorService = new TestExecutorService();
controller.executorMsgs = statsExecutorService;
controller.executorPacketIn = pktInExecutorService;
controller.executorFlowRemoved = flowRmvExecutorService;
}
/**
......@@ -148,19 +141,6 @@ public class OpenFlowControllerImplPacketsTest {
}
/**
* Tests a packet in listen operation.
*/
@Test
public void testPacketInListen() {
agent.addConnectedSwitch(dpid1, switch1);
OFMessage packetInPacket = new MockOfPacketIn();
controller.processPacket(dpid1, packetInPacket);
assertThat(packetListener.contexts(), hasSize(1));
assertThat(pktInExecutorService.submittedMessages(), hasSize(1));
assertThat(pktInExecutorService.submittedMessages().get(0), is(packetInPacket));
}
/**
* Tests an error operation.
*/
@Test
......@@ -171,16 +151,4 @@ public class OpenFlowControllerImplPacketsTest {
assertThat(statsExecutorService.submittedMessages(), hasSize(1));
assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket));
}
/**
* Tests a packet in operation.
*/
@Test
public void testFlowRemoved() {
agent.addConnectedSwitch(dpid1, switch1);
OFMessage flowRemovedPacket = new MockOfFlowRemoved();
controller.processPacket(dpid1, flowRemovedPacket);
assertThat(flowRmvExecutorService.submittedMessages(), hasSize(1));
assertThat(flowRmvExecutorService.submittedMessages().get(0), is(flowRemovedPacket));
}
}
......