Andrea Campanella
Committed by Gerrit Code Review

Moving Openflow executors from submit to execute

Change-Id: I446747c7b28d2562ff14afe7e898cab8a83a14b7
......@@ -95,7 +95,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
protected Set<OpenFlowEventListener> ofOutgoingMsgListener = new CopyOnWriteArraySet<>();
protected ExecutorService executorMsgs =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d"));
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-outgoing-msg-stats-%d", log));
// messagesPendingMastership is used as synchronization variable for
// all mastership related changes. In this block, mastership (including
......@@ -173,7 +173,7 @@ public abstract class AbstractOpenFlowSwitch extends AbstractHandlerBehaviour
if (m.getType() == OFType.PACKET_OUT ||
m.getType() == OFType.FLOW_MOD ||
m.getType() == OFType.STATS_REQUEST) {
executorMsgs.submit(new OFMessageHandler(dpid, m));
executorMsgs.execute(new OFMessageHandler(dpid, m));
}
});
}
......
......@@ -15,15 +15,15 @@
*/
package org.onosproject.openflow.controller.driver;
import org.jboss.netty.channel.Channel;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.jboss.netty.channel.Channel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
......@@ -48,11 +48,10 @@ public class AbstractOpenFlowSwitchTest {
}
@Override
public Future<?> submit(Runnable task) {
public void execute(Runnable task) {
AbstractOpenFlowSwitch.OFMessageHandler handler =
(AbstractOpenFlowSwitch.OFMessageHandler) task;
submittedMessages.add(handler.msg);
return null;
}
}
......
......@@ -113,16 +113,16 @@ public class OpenFlowControllerImpl implements OpenFlowController {
private int workerThreads = DEFAULT_WORKER_THREADS;
protected ExecutorService executorMsgs =
Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d", log));
protected ExecutorService executorPacketIn =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d"));
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-pkt-in-stats-%d", log));
protected ExecutorService executorFlowRemoved =
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d"));
Executors.newCachedThreadPool(groupedThreads("onos/of", "event-flow-removed-stats-%d", log));
private final ExecutorService executorBarrier =
Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d", log));
protected ConcurrentMap<Dpid, OpenFlowSwitch> connectedSwitches =
new ConcurrentHashMap<>();
......@@ -286,19 +286,19 @@ public class OpenFlowControllerImpl implements OpenFlowController {
p.handlePacket(pktCtx);
}
if (monitorAllEvents) {
executorPacketIn.submit(new OFMessageHandler(dpid, msg));
executorPacketIn.execute(new OFMessageHandler(dpid, msg));
}
break;
// TODO: Consider using separate threadpool for sensitive messages.
// ie. Back to back error could cause us to starve.
case FLOW_REMOVED:
if (monitorAllEvents) {
executorFlowRemoved.submit(new OFMessageHandler(dpid, msg));
executorFlowRemoved.execute(new OFMessageHandler(dpid, msg));
break;
}
case ERROR:
log.debug("Received error message from {}: {}", dpid, msg);
executorMsgs.submit(new OFMessageHandler(dpid, msg));
executorMsgs.execute(new OFMessageHandler(dpid, msg));
break;
case STATS_REPLY:
OFStatsReply reply = (OFStatsReply) msg;
......@@ -315,7 +315,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
rep.setEntries(Lists.newLinkedList(flowStats));
rep.setXid(reply.getXid());
executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case TABLE:
......@@ -324,7 +324,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
OFTableStatsReply.Builder rep =
OFFactories.getFactory(msg.getVersion()).buildTableStatsReply();
rep.setEntries(Lists.newLinkedList(tableStats));
executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case GROUP:
......@@ -334,7 +334,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
OFFactories.getFactory(msg.getVersion()).buildGroupStatsReply();
rep.setEntries(Lists.newLinkedList(groupStats));
rep.setXid(reply.getXid());
executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case GROUP_DESC:
......@@ -345,14 +345,14 @@ public class OpenFlowControllerImpl implements OpenFlowController {
OFFactories.getFactory(msg.getVersion()).buildGroupDescStatsReply();
rep.setEntries(Lists.newLinkedList(groupDescStats));
rep.setXid(reply.getXid());
executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
break;
case PORT:
executorMsgs.submit(new OFMessageHandler(dpid, reply));
executorMsgs.execute(new OFMessageHandler(dpid, reply));
break;
case METER:
executorMsgs.submit(new OFMessageHandler(dpid, reply));
executorMsgs.execute(new OFMessageHandler(dpid, reply));
break;
case EXPERIMENTER:
if (reply instanceof OFCalientFlowStatsReply) {
......@@ -394,10 +394,10 @@ public class OpenFlowControllerImpl implements OpenFlowController {
OFFlowStatsReply.Builder rep =
OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
rep.setEntries(Lists.newLinkedList(flowStats));
executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
executorMsgs.execute(new OFMessageHandler(dpid, rep.build()));
}
} else {
executorMsgs.submit(new OFMessageHandler(dpid, reply));
executorMsgs.execute(new OFMessageHandler(dpid, reply));
}
break;
default:
......@@ -406,7 +406,7 @@ public class OpenFlowControllerImpl implements OpenFlowController {
}
break;
case BARRIER_REPLY:
executorBarrier.submit(new OFMessageHandler(dpid, msg));
executorBarrier.execute(new OFMessageHandler(dpid, msg));
break;
case EXPERIMENTER:
long experimenter = ((OFExperimenter) msg).getExperimenter();
......
......@@ -15,21 +15,15 @@
*/
package org.onosproject.openflow.controller.impl;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openflow.ExecutorServiceAdapter;
import org.onosproject.openflow.MockOfPortStatus;
import org.onosproject.openflow.OpenFlowSwitchListenerAdapter;
import org.onosproject.openflow.OpenflowSwitchDriverAdapter;
import org.onosproject.openflow.MockOfFeaturesReply;
import org.onosproject.openflow.MockOfPacketIn;
import org.onosproject.openflow.MockOfPortStatus;
import org.onosproject.openflow.OfMessageAdapter;
import org.onosproject.openflow.OpenFlowSwitchListenerAdapter;
import org.onosproject.openflow.OpenflowSwitchDriverAdapter;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowPacketContext;
import org.onosproject.openflow.controller.OpenFlowSwitch;
......@@ -37,11 +31,14 @@ import org.onosproject.openflow.controller.PacketListener;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFType;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import static junit.framework.TestCase.fail;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
/**
* Tests for packet processing in the open flow controller impl class.
......@@ -85,11 +82,10 @@ public class OpenFlowControllerImplPacketsTest {
}
@Override
public Future<?> submit(Runnable task) {
public void execute(Runnable task) {
OpenFlowControllerImpl.OFMessageHandler handler =
(OpenFlowControllerImpl.OFMessageHandler) task;
submittedMessages.add(handler.msg);
return null;
}
}
......