Jian Li
Committed by Ray Milkey

[ONOS-3724] Fix the Cbench regression issue

Control message monitoring brings some overhead to controller.
In an extreme stressing environment (e.g., running Cbench),
it leads potential performance degradation.

This commit tries to mitigate the Cbench regression with two steps:
1. improve the monitoring performance by assigning more # of
threads in each thread group.
2. make the control message listening feature optional.

Change-Id: I4f7361b7c598c6de71d390eab78a20ada381d4dd
......@@ -65,6 +65,13 @@ public interface OpenFlowController {
OpenFlowSwitch getEqualSwitch(Dpid dpid);
/**
* If this set to be true, all incoming events are monitored.
* Other wise, only stats related incoming events are monitored
* @param monitor monitoring flag
*/
void monitorAllEvents(boolean monitor);
/**
* Register a listener for meta events that occur to OF
* devices.
* @param listener the listener to notify
......
......@@ -92,10 +92,10 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
protected OFFeaturesReply features;
protected OFDescStatsReply desc;
protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
protected Set<OpenFlowEventListener> ofOutgoingMsgListener = new CopyOnWriteArraySet<>();
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(2, groupedThreads("onos/of", "ctrl-msg-stats-%d"));
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d"));
// messagesPendingMastership is used as synchronization variable for
// all mastership related changes. In this block, mastership (including
......@@ -167,14 +167,16 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
}
}
// listen to outgoing control messages
msgs.forEach(m -> {
if (m.getType() == OFType.PACKET_OUT ||
m.getType() == OFType.FLOW_MOD ||
m.getType() == OFType.STATS_REQUEST) {
executorMsgs.submit(new OFMessageHandler(dpid, m));
}
});
// listen to outgoing control messages only if listeners are registered
if (ofOutgoingMsgListener.size() != 0) {
msgs.forEach(m -> {
if (m.getType() == OFType.PACKET_OUT ||
m.getType() == OFType.FLOW_MOD ||
m.getType() == OFType.STATS_REQUEST) {
executorMsgs.submit(new OFMessageHandler(dpid, m));
}
});
}
}
private void sendMsgsOnChannel(List<OFMessage> msgs) {
......@@ -332,12 +334,12 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public void addEventListener(OpenFlowEventListener listener) {
ofEventListener.add(listener);
ofOutgoingMsgListener.add(listener);
}
@Override
public void removeEventListener(OpenFlowEventListener listener) {
ofEventListener.remove(listener);
ofOutgoingMsgListener.remove(listener);
}
@Override
......@@ -547,7 +549,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
@Override
public void run() {
for (OpenFlowEventListener listener : ofEventListener) {
for (OpenFlowEventListener listener : ofOutgoingMsgListener) {
listener.handleMessage(dpid, msg);
}
}
......
......@@ -52,6 +52,10 @@ public class OpenflowControllerAdapter implements OpenFlowController {
}
@Override
public void monitorAllEvents(boolean monitor) {
}
@Override
public void addListener(OpenFlowSwitchListener listener) {
}
......
......@@ -17,6 +17,8 @@ package org.onosproject.openflow.controller.driver;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.jboss.netty.channel.Channel;
import java.util.ArrayList;
......@@ -65,6 +67,7 @@ public class AbstractOpenFlowSwitchTest {
ofSwitch.executorMsgs = executorService;
Channel channel = new ChannelAdapter();
ofSwitch.setChannel(channel);
ofSwitch.addEventListener(new OpenFlowEventListenerAdapter());
}
/**
......@@ -120,4 +123,11 @@ public class AbstractOpenFlowSwitchTest {
public void processDriverHandshakeMessage(OFMessage m) {
}
}
private class OpenFlowEventListenerAdapter implements OpenFlowEventListener {
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
}
}
}
......
......@@ -115,6 +115,12 @@ public class OpenFlowControllerImpl implements OpenFlowController {
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
protected ExecutorService executorPacketIn =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d"));
protected ExecutorService executorFlowRemoved =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d"));
private final ExecutorService executorBarrier =
Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
......@@ -133,6 +139,8 @@ public class OpenFlowControllerImpl implements OpenFlowController {
protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
protected boolean monitorAllEvents = false;
protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
ArrayListMultimap.create();
......@@ -210,6 +218,11 @@ public class OpenFlowControllerImpl implements OpenFlowController {
}
@Override
public void monitorAllEvents(boolean monitor) {
this.monitorAllEvents = monitor;
}
@Override
public void addListener(OpenFlowSwitchListener listener) {
if (!ofSwitchListener.contains(listener)) {
this.ofSwitchListener.add(listener);
......@@ -272,13 +285,17 @@ public class OpenFlowControllerImpl implements OpenFlowController {
for (PacketListener p : ofPacketListener.values()) {
p.handlePacket(pktCtx);
}
executorMsgs.submit(new OFMessageHandler(dpid, msg));
if (monitorAllEvents) {
executorPacketIn.submit(new OFMessageHandler(dpid, msg));
}
break;
// TODO: Consider using separate threadpool for sensitive messages.
// ie. Back to back error could cause us to starve.
case FLOW_REMOVED:
executorMsgs.submit(new OFMessageHandler(dpid, msg));
break;
if (monitorAllEvents) {
executorFlowRemoved.submit(new OFMessageHandler(dpid, msg));
break;
}
case ERROR:
log.debug("Received error message from {}: {}", dpid, msg);
executorMsgs.submit(new OFMessageHandler(dpid, msg));
......
......@@ -53,7 +53,9 @@ public class OpenFlowControllerImplPacketsTest {
OpenFlowSwitch switch1;
OpenFlowSwitchListenerAdapter switchListener;
TestPacketListener packetListener;
TestExecutorService executorService;
TestExecutorService statsExecutorService;
TestExecutorService pktInExecutorService;
TestExecutorService flowRmvExecutorService;
/**
* Mock packet listener that accumulates packets.
......@@ -108,12 +110,19 @@ public class OpenFlowControllerImplPacketsTest {
agent = controller.agent;
switchListener = new OpenFlowSwitchListenerAdapter();
controller.addListener(switchListener);
controller.monitorAllEvents(true);
packetListener = new TestPacketListener();
controller.addPacketListener(100, packetListener);
executorService = new TestExecutorService();
controller.executorMsgs = executorService;
statsExecutorService = new TestExecutorService();
pktInExecutorService = new TestExecutorService();
flowRmvExecutorService = new TestExecutorService();
controller.executorMsgs = statsExecutorService;
controller.executorPacketIn = pktInExecutorService;
controller.executorFlowRemoved = flowRmvExecutorService;
}
/**
......@@ -151,8 +160,8 @@ public class OpenFlowControllerImplPacketsTest {
OFMessage packetInPacket = new MockOfPacketIn();
controller.processPacket(dpid1, packetInPacket);
assertThat(packetListener.contexts(), hasSize(1));
assertThat(executorService.submittedMessages(), hasSize(1));
assertThat(executorService.submittedMessages().get(0), is(packetInPacket));
assertThat(pktInExecutorService.submittedMessages(), hasSize(1));
assertThat(pktInExecutorService.submittedMessages().get(0), is(packetInPacket));
}
/**
......@@ -163,8 +172,8 @@ public class OpenFlowControllerImplPacketsTest {
agent.addConnectedSwitch(dpid1, switch1);
OfMessageAdapter errorPacket = new OfMessageAdapter(OFType.ERROR);
controller.processPacket(dpid1, errorPacket);
assertThat(executorService.submittedMessages(), hasSize(1));
assertThat(executorService.submittedMessages().get(0), is(errorPacket));
assertThat(statsExecutorService.submittedMessages(), hasSize(1));
assertThat(statsExecutorService.submittedMessages().get(0), is(errorPacket));
}
/**
......@@ -175,7 +184,7 @@ public class OpenFlowControllerImplPacketsTest {
agent.addConnectedSwitch(dpid1, switch1);
OFMessage flowRemovedPacket = new MockOfFlowRemoved();
controller.processPacket(dpid1, flowRemovedPacket);
assertThat(executorService.submittedMessages(), hasSize(1));
assertThat(executorService.submittedMessages().get(0), is(flowRemovedPacket));
assertThat(flowRmvExecutorService.submittedMessages(), hasSize(1));
assertThat(flowRmvExecutorService.submittedMessages().get(0), is(flowRemovedPacket));
}
}
......
......@@ -277,6 +277,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
public void enableCtrlMsgMonitor() {
isCtrlMsgMonitor = true;
controller.addEventListener(inMsgListener);
controller.monitorAllEvents(isCtrlMsgMonitor);
for (OpenFlowSwitch sw : controller.getSwitches()) {
sw.addEventListener(outMsgListener);
}
......@@ -288,6 +289,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
*/
public void disableCtrlMsgMonitor() {
isCtrlMsgMonitor = false;
controller.monitorAllEvents(isCtrlMsgMonitor);
controller.removeEventListener(inMsgListener);
for (OpenFlowSwitch sw: controller.getSwitches()) {
sw.removeEventListener(outMsgListener);
......@@ -343,7 +345,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
if (isCtrlMsgMonitor) {
// TODO: feed the control message stats via ControlMetricsServiceFactory
// TODO: handle all incoming OF messages
}
}
}
......@@ -356,7 +358,7 @@ public class OpenFlowDeviceProvider extends AbstractProvider implements DevicePr
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
if (isCtrlMsgMonitor) {
// TODO: feed the control message stats via ControlMetricsServiceFactory
// TODO: handle all outgoing OF messages
}
}
}
......
......@@ -262,6 +262,10 @@ public class OpenFlowDeviceProviderTest {
}
@Override
public void monitorAllEvents(boolean monitor) {
}
@Override
public void addListener(OpenFlowSwitchListener listener) {
this.listener = listener;
}
......
......@@ -289,6 +289,10 @@ public class OpenFlowGroupProviderTest {
return null;
}
@Override
public void monitorAllEvents(boolean monitor) {
}
}
private class TestGroupProviderRegistry implements GroupProviderRegistry {
......
......@@ -287,6 +287,10 @@ public class OpenFlowPacketProviderTest {
}
@Override
public void monitorAllEvents(boolean monitor) {
}
@Override
public void addListener(OpenFlowSwitchListener listener) {
}
......