Carmelo Cascone
Committed by Gerrit Code Review

ONOS-4278 Implemented BMv2 control plane server and packet-out support

Change-Id: I4d9027b232dea31d1091c980fb040ec93da9473d
......@@ -34,11 +34,11 @@
<properties>
<!-- BMv2 Commit ID and Thrift version -->
<bmv2.commit>a012ee4124c1892a91a359660824d311d5d7fe88</bmv2.commit>
<bmv2.commit>4421bafd6d26740b0bbf802c2e9f9f54c1211b13</bmv2.commit>
<bmv2.thrift.version>0.9.3</bmv2.thrift.version>
<!-- Do not change below -->
<bmv2.baseurl>
https://raw.githubusercontent.com/p4lang/behavioral-model/${bmv2.commit}
https://raw.githubusercontent.com/ccascone/behavioral-model/${bmv2.commit}
</bmv2.baseurl>
<bmv2.thrift.srcdir>${project.basedir}/src/main/thrift</bmv2.thrift.srcdir>
<thrift.path>${project.build.directory}/thrift-compiler/</thrift.path>
......@@ -56,6 +56,10 @@
<artifactId>onos-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
</dependencies>
<repositories>
......@@ -84,7 +88,7 @@
<executions>
<execution>
<id>download-bmv2-thrift-standard</id>
<phase>validate</phase>
<phase>initialize</phase>
<goals>
<goal>download-single</goal>
</goals>
......@@ -133,6 +137,20 @@
<toDir>${bmv2.thrift.srcdir}</toDir>
</configuration>
</execution>
<execution>
<id>download-bmv2-thrift-simple_switch-cpservice</id>
<phase>initialize</phase>
<goals>
<goal>download-single</goal>
</goals>
<configuration>
<url>${bmv2.baseurl}</url>
<fromFile>
targets/simple_switch/thrift/control_plane.thrift
</fromFile>
<toDir>${bmv2.thrift.srcdir}</toDir>
</configuration>
</execution>
</executions>
</plugin>
<!-- Extract Thrift compiler -->
......@@ -201,11 +219,12 @@
<version>0.1.11</version>
<configuration>
<thriftExecutable>${thrift.path}/${thrift.filename}</thriftExecutable>
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
</configuration>
<executions>
<execution>
<id>thrift-sources</id>
<phase>generate-sources</phase>
<phase>initialize</phase>
<goals>
<goal>compile</goal>
</goals>
......@@ -227,13 +246,22 @@
<configuration>
<sources>
<source>
${project.build.directory}/generated-sources/thrift
${project.build.directory}/generated-sources
</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<!-- OSGi -->
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-scr-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
......
......@@ -16,6 +16,8 @@
package org.onosproject.bmv2.api.runtime;
import org.onlab.util.ImmutableByteSequence;
import java.util.Collection;
/**
......@@ -81,6 +83,15 @@ public interface Bmv2Client {
String dumpTable(String tableName) throws Bmv2RuntimeException;
/**
* Requests the device to transmit a given byte sequence over the given port.
*
* @param portNumber a port number
* @param packet a byte sequence
* @throws Bmv2RuntimeException
*/
void transmitPacket(int portNumber, ImmutableByteSequence packet) throws Bmv2RuntimeException;
/**
* Reset the state of the switch (e.g. delete all entries, etc.).
*
* @throws Bmv2RuntimeException if any error occurs
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bmv2.api.runtime;
import org.onlab.util.ImmutableByteSequence;
/**
* A server that listens for requests from a BMv2 device.
*/
public interface Bmv2ControlPlaneServer {
/**
* Default listening port.
*/
int DEFAULT_PORT = 40123;
/**
* Register the given hello listener, to be called each time a hello message is received from a BMv2 device.
*
* @param listener a hello listener
*/
void addHelloListener(HelloListener listener);
/**
* Unregister the given hello listener.
*
* @param listener a hello listener
*/
void removeHelloListener(HelloListener listener);
/**
* Register the given packet listener, to be called each time a packet-in message is received from a BMv2 device.
*
* @param listener a packet listener
*/
void addPacketListener(PacketListener listener);
/**
* Unregister the given packet listener.
*
* @param listener a packet listener
*/
void removePacketListener(PacketListener listener);
interface HelloListener {
/**
* Handles a hello message.
*
* @param device the BMv2 device that originated the message
*/
void handleHello(Bmv2Device device);
}
interface PacketListener {
/**
* Handles a packet-in message.
*
* @param device the BMv2 device that originated the message
* @param inputPort the device port where the packet was received
* @param reason a reason code
* @param tableId the table id that originated this packet-in
* @param contextId the context id where the packet-in was originated
* @param packet the packet body
*/
void handlePacketIn(Bmv2Device device, int inputPort, long reason, int tableId, int contextId,
ImmutableByteSequence packet);
}
}
\ No newline at end of file
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bmv2.api.runtime;
import com.google.common.base.Objects;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Representation of a BMv2 device.
*/
public final class Bmv2Device {
private final String thriftServerHost;
private final int thriftServerPort;
private final int internalDeviceId;
/**
* Creates a new Bmv2 device object.
*
* @param thriftServerHost the host of the Thrift runtime server running inside the device
* @param thriftServerPort the port of the Thrift runtime server running inside the device
* @param internalDeviceId the internal device id
*/
public Bmv2Device(String thriftServerHost, int thriftServerPort, int internalDeviceId) {
this.thriftServerHost = checkNotNull(thriftServerHost, "host cannot be null");
this.thriftServerPort = checkNotNull(thriftServerPort, "port cannot be null");
this.internalDeviceId = internalDeviceId;
}
/**
* Returns the hostname (or IP address) of the Thrift runtime server running inside the device.
*
* @return a string value
*/
public String thriftServerHost() {
return thriftServerHost;
}
/**
* Returns the port of the Thrift runtime server running inside the device.
*
* @return an integer value
*/
public int thriftServerPort() {
return thriftServerPort;
}
/**
* Returns the BMv2-internal device ID, which is an integer arbitrary chosen at device boot.
* Such an ID must not be confused with the ONOS-internal {@link org.onosproject.net.DeviceId}.
*
* @return an integer value
*/
public int getInternalDeviceId() {
return internalDeviceId;
}
@Override
public int hashCode() {
return Objects.hashCode(thriftServerHost, thriftServerPort, internalDeviceId);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final Bmv2Device other = (Bmv2Device) obj;
return Objects.equal(this.thriftServerHost, other.thriftServerHost)
&& Objects.equal(this.thriftServerPort, other.thriftServerPort)
&& Objects.equal(this.internalDeviceId, other.internalDeviceId);
}
@Override
public String toString() {
return thriftServerHost + ":" + thriftServerPort + "/" + internalDeviceId;
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.bmv2.ctl;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.onlab.util.ImmutableByteSequence;
import org.onosproject.bmv2.api.runtime.Bmv2ControlPlaneServer;
import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.core.CoreService;
import org.p4.bmv2.thrift.ControlPlaneService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
import static org.p4.bmv2.thrift.ControlPlaneService.Processor;
@Component(immediate = true)
@Service
public class Bmv2ControlPlaneThriftServer implements Bmv2ControlPlaneServer {
private static final String APP_ID = "org.onosproject.bmv2";
private static final Logger LOG = LoggerFactory.getLogger(Bmv2ControlPlaneThriftServer.class);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private final InternalTrackingProcessor trackingProcessor = new InternalTrackingProcessor();
private final ExecutorService executorService = Executors
.newFixedThreadPool(16, groupedThreads("onos/bmv2", "control-plane-server", LOG));
private final Set<HelloListener> helloListeners = new CopyOnWriteArraySet<>();
private final Set<PacketListener> packetListeners = new CopyOnWriteArraySet<>();
private TThreadPoolServer thriftServer;
private int serverPort = DEFAULT_PORT;
@Activate
public void activate() {
coreService.registerApplication(APP_ID);
try {
TServerTransport transport = new TServerSocket(serverPort);
LOG.info("Starting server on port {}...", serverPort);
this.thriftServer = new TThreadPoolServer(new TThreadPoolServer.Args(transport)
.processor(trackingProcessor)
.executorService(executorService));
executorService.execute(thriftServer::serve);
} catch (TTransportException e) {
LOG.error("Unable to start server", e);
}
LOG.info("Activated");
}
@Deactivate
public void deactivate() {
// Stop the server if running...
if (thriftServer != null && !thriftServer.isServing()) {
thriftServer.stop();
}
try {
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.error("Server threads did not terminate");
}
executorService.shutdownNow();
LOG.info("Deactivated");
}
@Override
public void addHelloListener(HelloListener listener) {
if (!helloListeners.contains(listener)) {
helloListeners.add(listener);
}
}
@Override
public void removeHelloListener(HelloListener listener) {
helloListeners.remove(listener);
}
@Override
public void addPacketListener(PacketListener listener) {
if (!packetListeners.contains(listener)) {
packetListeners.add(listener);
}
}
@Override
public void removePacketListener(PacketListener listener) {
packetListeners.remove(listener);
}
/**
* Handles service calls using registered listeners.
*/
private final class InternalServiceHandler implements ControlPlaneService.Iface {
private final TSocket socket;
private Bmv2Device remoteDevice;
private InternalServiceHandler(TSocket socket) {
this.socket = socket;
}
@Override
public boolean ping() {
return true;
}
@Override
public void hello(int thriftServerPort, int deviceId) {
// Locally note the remote device for future uses.
String host = socket.getSocket().getInetAddress().getHostAddress();
remoteDevice = new Bmv2Device(host, thriftServerPort, deviceId);
if (helloListeners.size() == 0) {
LOG.debug("Received hello, but there's no listener registered.");
} else {
helloListeners.forEach(listener -> listener.handleHello(remoteDevice));
}
}
@Override
public void packetIn(int port, long reason, int tableId, int contextId, ByteBuffer packet) {
if (remoteDevice == null) {
LOG.debug("Received packet-in, but the remote device is still unknown. Need a hello first...");
return;
}
if (packetListeners.size() == 0) {
LOG.debug("Received packet-in, but there's no listener registered.");
} else {
packetListeners.forEach(listener -> listener.handlePacketIn(remoteDevice,
port,
reason,
tableId,
contextId,
ImmutableByteSequence.copyFrom(packet)));
}
}
}
/**
* Thrift Processor decorator. This class is needed in order to have access to the socket when handling a call.
* Socket is needed to get the IP address of the client originating the call (see InternalServiceHandler.hello())
*/
private final class InternalTrackingProcessor implements TProcessor {
// Map sockets to processors.
// TODO: implement it as a cache so unused sockets are expired automatically
private final ConcurrentMap<TSocket, Processor<InternalServiceHandler>> processors = Maps.newConcurrentMap();
@Override
public boolean process(final TProtocol in, final TProtocol out) throws TException {
// Get the socket for this request.
TSocket socket = (TSocket) in.getTransport();
// Get or create a processor for this socket
Processor<InternalServiceHandler> processor = processors.computeIfAbsent(socket, s -> {
InternalServiceHandler handler = new InternalServiceHandler(s);
return new Processor<>(handler);
});
// Delegate to the processor we are decorating.
return processor.process(in, out);
}
}
}
......@@ -31,6 +31,7 @@ import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.onlab.util.ImmutableByteSequence;
import org.onosproject.bmv2.api.runtime.Bmv2Action;
import org.onosproject.bmv2.api.runtime.Bmv2Client;
import org.onosproject.bmv2.api.runtime.Bmv2ExactMatchParam;
......@@ -50,6 +51,7 @@ import org.p4.bmv2.thrift.BmMatchParamTernary;
import org.p4.bmv2.thrift.BmMatchParamType;
import org.p4.bmv2.thrift.BmMatchParamValid;
import org.p4.bmv2.thrift.DevMgrPortInfo;
import org.p4.bmv2.thrift.SimpleSwitch;
import org.p4.bmv2.thrift.Standard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -88,15 +90,18 @@ public final class Bmv2ThriftClient implements Bmv2Client {
.expireAfterAccess(CLIENT_CACHE_TIMEOUT, TimeUnit.SECONDS)
.removalListener(new ClientRemovalListener())
.build(new ClientLoader());
private final Standard.Iface stdClient;
private final Standard.Iface standardClient;
private final SimpleSwitch.Iface simpleSwitchClient;
private final TTransport transport;
private final DeviceId deviceId;
// ban constructor
private Bmv2ThriftClient(DeviceId deviceId, TTransport transport, Standard.Iface stdClient) {
private Bmv2ThriftClient(DeviceId deviceId, TTransport transport, Standard.Iface standardClient,
SimpleSwitch.Iface simpleSwitchClient) {
this.deviceId = deviceId;
this.transport = transport;
this.stdClient = stdClient;
this.standardClient = standardClient;
this.simpleSwitchClient = simpleSwitchClient;
LOG.debug("New client created! > deviceId={}", deviceId);
}
......@@ -131,9 +136,9 @@ public final class Bmv2ThriftClient implements Bmv2Client {
try {
LOG.debug("Pinging device... > deviceId={}", deviceId);
Bmv2ThriftClient client = of(deviceId);
client.stdClient.bm_dev_mgr_show_ports();
LOG.debug("Device reachable! > deviceId={}", deviceId);
return true;
boolean result = client.simpleSwitchClient.ping();
LOG.debug("Device pinged! > deviceId={}, state={}", deviceId, result);
return result;
} catch (TException | Bmv2RuntimeException e) {
LOG.debug("Device NOT reachable! > deviceId={}", deviceId);
return false;
......@@ -242,7 +247,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
options.setPriority(entry.priority());
}
entryId = stdClient.bm_mt_add_entry(
entryId = standardClient.bm_mt_add_entry(
CONTEXT_ID,
entry.tableName(),
buildMatchParamsList(entry.matchKey()),
......@@ -253,7 +258,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
if (entry.hasTimeout()) {
/* bmv2 accepts timeouts in milliseconds */
int msTimeout = (int) Math.round(entry.timeout() * 1_000);
stdClient.bm_mt_set_entry_ttl(
standardClient.bm_mt_set_entry_ttl(
CONTEXT_ID, entry.tableName(), entryId, msTimeout);
}
......@@ -285,7 +290,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
LOG.debug("Modifying table entry... > deviceId={}, entryId={}/{}", deviceId, tableName, entryId);
try {
stdClient.bm_mt_modify_entry(
standardClient.bm_mt_modify_entry(
CONTEXT_ID,
tableName,
entryId,
......@@ -306,7 +311,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
LOG.debug("Deleting table entry... > deviceId={}, entryId={}/{}", deviceId, tableName, entryId);
try {
stdClient.bm_mt_delete_entry(CONTEXT_ID, tableName, entryId);
standardClient.bm_mt_delete_entry(CONTEXT_ID, tableName, entryId);
LOG.debug("Table entry deleted! > deviceId={}, entryId={}/{}", deviceId, tableName, entryId);
} catch (TException e) {
LOG.debug("Exception while deleting table entry: {} > deviceId={}, entryId={}/{}",
......@@ -322,7 +327,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
LOG.debug("Setting table default... > deviceId={}, tableName={}, action={}", deviceId, tableName, action);
try {
stdClient.bm_mt_set_default_action(
standardClient.bm_mt_set_default_action(
CONTEXT_ID,
tableName,
action.name(),
......@@ -341,7 +346,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
LOG.debug("Retrieving port info... > deviceId={}", deviceId);
try {
List<DevMgrPortInfo> portInfos = stdClient.bm_dev_mgr_show_ports();
List<DevMgrPortInfo> portInfos = standardClient.bm_dev_mgr_show_ports();
Collection<Bmv2PortInfo> bmv2PortInfos = Lists.newArrayList();
......@@ -366,7 +371,7 @@ public final class Bmv2ThriftClient implements Bmv2Client {
LOG.debug("Retrieving table dump... > deviceId={}, tableName={}", deviceId, tableName);
try {
String dump = stdClient.bm_dump_table(CONTEXT_ID, tableName);
String dump = standardClient.bm_dump_table(CONTEXT_ID, tableName);
LOG.debug("Table dump retrieved! > deviceId={}, tableName={}", deviceId, tableName);
return dump;
} catch (TException e) {
......@@ -377,12 +382,28 @@ public final class Bmv2ThriftClient implements Bmv2Client {
}
@Override
public void transmitPacket(int portNumber, ImmutableByteSequence packet) throws Bmv2RuntimeException {
LOG.debug("Requesting packet transmission... > portNumber={}, packet={}", portNumber, packet);
try {
simpleSwitchClient.push_packet(portNumber, ByteBuffer.wrap(packet.asArray()));
LOG.debug("Packet transmission requested! > portNumber={}, packet={}", portNumber, packet);
} catch (TException e) {
LOG.debug("Exception while requesting packet transmission: {} > portNumber={}, packet={}",
portNumber, packet);
throw new Bmv2RuntimeException(e.getMessage(), e);
}
}
@Override
public void resetState() throws Bmv2RuntimeException {
LOG.debug("Resetting device state... > deviceId={}", deviceId);
try {
stdClient.bm_reset_state();
standardClient.bm_reset_state();
LOG.debug("Device state reset! > deviceId={}", deviceId);
} catch (TException e) {
LOG.debug("Exception while resetting device state: {} > deviceId={}", e, deviceId);
......@@ -396,7 +417,6 @@ public final class Bmv2ThriftClient implements Bmv2Client {
private static class ClientLoader
extends CacheLoader<DeviceId, Bmv2ThriftClient> {
// Connection retries options: max 10 retries each 200 ms
private static final Options RECONN_OPTIONS = new Options(NUM_CONNECTION_RETRIES, TIME_BETWEEN_RETRIES);
@Override
......@@ -408,14 +428,20 @@ public final class Bmv2ThriftClient implements Bmv2Client {
TTransport transport = new TSocket(
info.getLeft(), info.getRight());
TProtocol protocol = new TBinaryProtocol(transport);
Standard.Client stdClient = new Standard.Client(
// Our BMv2 device implements multiple Thrift services, create a client for each one.
Standard.Client standardClient = new Standard.Client(
new TMultiplexedProtocol(protocol, "standard"));
// Wrap the client so to automatically have synchronization and resiliency to connectivity problems
Standard.Iface reconnStdIface = SafeThriftClient.wrap(stdClient,
Standard.Iface.class,
RECONN_OPTIONS);
return new Bmv2ThriftClient(deviceId, transport, reconnStdIface);
SimpleSwitch.Client simpleSwitch = new SimpleSwitch.Client(
new TMultiplexedProtocol(protocol, "simple_switch"));
// Wrap clients so to automatically have synchronization and resiliency to connectivity errors
Standard.Iface safeStandardClient = SafeThriftClient.wrap(standardClient,
Standard.Iface.class,
RECONN_OPTIONS);
SimpleSwitch.Iface safeSimpleSwitchClient = SafeThriftClient.wrap(simpleSwitch,
SimpleSwitch.Iface.class,
RECONN_OPTIONS);
return new Bmv2ThriftClient(deviceId, transport, safeStandardClient, safeSimpleSwitchClient);
}
}
......