tom

Added IO loop test to the foo app.

......@@ -16,4 +16,20 @@
<description>ONOS application for miscellaneous experiments</description>
<dependencies>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
</dependencies>
</project>
......
package org.onlab.onos.foo;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.lang.String.format;
import static java.lang.System.out;
import static org.onlab.onos.foo.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopTestClient {
private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
private final InetAddress ip;
private final int port;
private final int msgCount;
private final int msgLength;
private final List<CustomIOLoop> iloops = new ArrayList<>();
private final ExecutorService ipool;
private final ExecutorService wpool;
Counter messages;
Counter bytes;
/**
* Main entry point to launch the client.
*
* @param args command-line arguments
* @throws java.io.IOException if unable to connect to server
* @throws InterruptedException if latch wait gets interrupted
* @throws java.util.concurrent.ExecutionException if wait gets interrupted
* @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
startStandalone(args);
System.exit(0);
}
/**
* Starts a standalone IO loop test client.
*
* @param args command-line arguments
*/
public static void startStandalone(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
wc, mc, ml, ip);
IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
client.start();
delay(500);
client.await(to);
client.report();
}
/**
* Creates a speed client.
*
* @param ip ip address of server
* @param wc worker count
* @param mc message count to send per client
* @param ml message length in bytes
* @param port socket port
* @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
this.ip = ip;
this.port = port;
this.msgCount = mc;
this.msgLength = ml;
this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
for (int i = 0; i < wc; i++) {
iloops.add(new CustomIOLoop());
}
}
/**
* Starts the client workers.
*
* @throws java.io.IOException if unable to open connection
*/
public void start() throws IOException {
messages = new Counter();
bytes = new Counter();
// First start up all the IO loops
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
// Wait for all of them to get going
for (CustomIOLoop l : iloops) {
l.awaitStart(1000);
}
// ... and Next open all connections; one-per-loop
for (CustomIOLoop l : iloops) {
openConnection(l);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void openConnection(CustomIOLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(ip, port);
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
loop.connectStream(ch);
ch.connect(sa);
}
/**
* Waits for the client workers to complete.
*
* @param secs timeout in seconds
* @throws java.util.concurrent.ExecutionException if execution failed
* @throws InterruptedException if interrupt occurred while waiting
* @throws java.util.concurrent.TimeoutException if timeout occurred
*/
public void await(int secs) throws InterruptedException,
ExecutionException, TimeoutException {
for (CustomIOLoop l : iloops) {
if (l.worker.task != null) {
l.worker.task.get(secs, TimeUnit.SECONDS);
}
}
messages.freeze();
bytes.freeze();
}
/**
* Reports on the accumulated throughput trackers.
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength))));
}
// Loop for transfer of fixed-length messages
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
Worker worker = new Worker();
public CustomIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
return new TestMessageStream(msgLength, channel, this);
}
@Override
protected synchronized void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
// FORMAT.format(stream.messagesIn().throughput()),
// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
// FORMAT.format(stream.messagesOut().throughput()),
// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
stream.messagesOut().reset();
stream.bytesOut().reset();
}
@Override
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> b) {
worker.release(messages.size());
}
@Override
protected void connect(SelectionKey key) {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
w.pump(b);
}
}
/**
* Auxiliary worker to connect and pump batched messages using blocking I/O.
*/
private class Worker implements Runnable {
private static final int BATCH_SIZE = 1000;
private static final int PERMITS = 2 * BATCH_SIZE;
private TestMessageStream b;
private FutureTask<Worker> task;
// Stuff to throttle pump
private final Semaphore semaphore = new Semaphore(PERMITS);
private int msgWritten;
void pump(TestMessageStream b) {
this.b = b;
task = new FutureTask<>(this, this);
wpool.execute(task);
}
@Override
public void run() {
try {
log.info("Worker started...");
List<TestMessage> batch = new ArrayList<>();
for (int i = 0; i < BATCH_SIZE; i++) {
batch.add(new TestMessage(msgLength));
}
while (msgWritten < msgCount) {
msgWritten += writeBatch(b, batch);
}
// Now try to get all the permits back before sending poison pill
semaphore.acquireUninterruptibly(PERMITS);
b.close();
log.info("Worker done...");
} catch (IOException e) {
log.error("Worker unable to perform I/O", e);
}
}
private int writeBatch(TestMessageStream b, List<TestMessage> batch)
throws IOException {
int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
acquire(count);
if (count == BATCH_SIZE) {
b.write(batch);
} else {
for (int i = 0; i < count; i++) {
b.write(batch.get(i));
}
}
return count;
}
// Release permits based on the specified number of message credits
private void release(int permits) {
semaphore.release(permits);
}
// Acquire permit for a single batch
private void acquire(int permits) {
semaphore.acquireUninterruptibly(permits);
}
}
}
package org.onlab.onos.foo;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.String.format;
import static java.lang.System.out;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopTestServer {
private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
private static final int PRUNE_FREQUENCY = 1000;
static final int PORT = 9876;
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
private final AcceptorLoop aloop;
private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
private final List<CustomIOLoop> iloops = new ArrayList<>();
private final ExecutorService ipool;
private final int workerCount;
private final int msgLength;
private int lastWorker = -1;
Counter messages;
Counter bytes;
/**
* Main entry point to launch the server.
*
* @param args command-line arguments
* @throws java.io.IOException if unable to crate IO loops
*/
public static void main(String[] args) throws IOException {
startStandalone(args);
System.exit(0);
}
/**
* Starts a standalone IO loop test server.
*
* @param args command-line arguments
*/
public static void startStandalone(String[] args) throws IOException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
log.info("Setting up the server with {} workers, {} byte messages on {}... ",
wc, ml, ip);
IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
server.start();
// Start pruning clients.
while (true) {
delay(PRUNE_FREQUENCY);
server.prune();
}
}
/**
* Creates a speed server.
*
* @param ip optional ip of the adapter where to bind
* @param wc worker count
* @param ml message length in bytes
* @param port listen port
* @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
this.msgLength = ml;
this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
for (int i = 0; i < workerCount; i++) {
iloops.add(new CustomIOLoop());
}
}
/**
* Start the server IO loops and kicks off throughput tracking.
*/
public void start() {
messages = new Counter();
bytes = new Counter();
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
apool.execute(aloop);
for (CustomIOLoop l : iloops) {
l.awaitStart(TIMEOUT);
}
aloop.awaitStart(TIMEOUT);
}
/**
* Stop the server IO loops and freezes throughput tracking.
*/
public void stop() {
aloop.shutdown();
for (CustomIOLoop l : iloops) {
l.shutdown();
}
for (CustomIOLoop l : iloops) {
l.awaitStop(TIMEOUT);
}
aloop.awaitStop(TIMEOUT);
messages.freeze();
bytes.freeze();
}
/**
* Reports on the accumulated throughput trackers.
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength))));
}
/**
* Prunes the IO loops of stale message buffers.
*/
public void prune() {
for (CustomIOLoop l : iloops) {
l.pruneStaleStreams();
}
}
// Get the next worker to which a client should be assigned
private synchronized CustomIOLoop nextWorker() {
lastWorker = (lastWorker + 1) % workerCount;
return iloops.get(lastWorker);
}
// Loop for transfer of fixed-length messages
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
public CustomIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
return new TestMessageStream(msgLength, channel, this);
}
@Override
protected void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
// FORMAT.format(stream.messagesIn().throughput()),
// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
// FORMAT.format(stream.messagesOut().throughput()),
// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
}
@Override
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> stream) {
try {
stream.write(messages);
} catch (IOException e) {
log.error("Unable to echo messages", e);
}
}
}
// Loop for accepting client connections
private class CustomAcceptLoop extends AcceptorLoop {
public CustomAcceptLoop(SocketAddress address) throws IOException {
super(500, address);
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
nextWorker().acceptStream(sc);
log.info("Connected client");
}
}
}
package org.onlab.onos.foo;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import static org.onlab.onos.foo.IOLoopTestClient.startStandalone;
/**
* Starts the test IO loop client.
*/
@Command(scope = "onos", name = "test-io-client",
description = "Starts the test IO loop client")
public class TestIOClientCommand extends AbstractShellCommand {
@Override
protected void execute() {
try {
startStandalone(new String[]{});
} catch (Exception e) {
error("Unable to start server %s", e);
}
}
}
package org.onlab.onos.foo;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
/**
* Starts the test IO loop server.
*/
@Command(scope = "onos", name = "test-io-server",
description = "Starts the test IO loop server")
public class TestIOServerCommand extends AbstractShellCommand {
@Override
protected void execute() {
try {
startStandalone(new String[]{});
} catch (Exception e) {
error("Unable to start server %s", e);
}
}
}
package org.onlab.onos.foo;
import org.onlab.nio.AbstractMessage;
/**
* Fixed-length message.
*/
public class TestMessage extends AbstractMessage {
private final byte[] data;
/**
* Creates a new message with the specified length.
*
* @param length message length
*/
public TestMessage(int length) {
this.length = length;
data = new byte[length];
}
/**
* Creates a new message with the specified data.
*
* @param data message data
*/
TestMessage(byte[] data) {
this.length = data.length;
this.data = data;
}
/**
* Gets the backing byte array data.
*
* @return backing byte array
*/
public byte[] data() {
return data;
}
}
package org.onlab.onos.foo;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
/**
* Fixed-length message transfer buffer.
*/
public class TestMessageStream extends MessageStream<TestMessage> {
private static final String E_WRONG_LEN = "Illegal message length: ";
private final int length;
/**
* Create a new buffer for transferring messages of the specified length.
*
* @param length message length
* @param ch backing channel
* @param loop driver loop
*/
public TestMessageStream(int length, ByteChannel ch,
IOLoop<TestMessage, ?> loop) {
super(loop, ch, 64 * 1024, 500);
this.length = length;
}
@Override
protected TestMessage read(ByteBuffer rb) {
if (rb.remaining() < length) {
return null;
}
TestMessage message = new TestMessage(length);
rb.get(message.data());
return message;
}
/**
* {@inheritDoc}
* <p/>
* This implementation enforces the message length against the buffer
* supported length.
*
* @throws IllegalArgumentException if message size does not match the
* supported buffer size
*/
@Override
protected void write(TestMessage message, ByteBuffer wb) {
if (message.length() != length) {
throw new IllegalArgumentException(E_WRONG_LEN + message.length());
}
wb.put(message.data());
}
}
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
<command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
<command>
<action class="org.onlab.onos.foo.TestIOClientCommand"/>
</command>
<command>
<action class="org.onlab.onos.foo.TestIOServerCommand"/>
</command>
</command-bundle>
</blueprint>
......@@ -17,6 +17,8 @@
<bundle>mvn:com.esotericsoftware/minlog/1.3.0</bundle>
<bundle>mvn:org.objenesis/objenesis/2.1</bundle>
<bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle>
<bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-thirdparty-web" version="1.0.0"
......
......@@ -172,6 +172,11 @@
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${project.version}</version>
</dependency>
......
......@@ -52,4 +52,18 @@ public abstract class Tools {
public static String toHex(long value, int width) {
return Strings.padStart(UnsignedLongs.toString(value, 16), width, '0');
}
/**
* Suspends the current thread for a specified number of millis.
*
* @param ms number of millis
*/
public static void delay(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
}
......
......@@ -66,7 +66,7 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
*
* @param stream message stream to remove
*/
void removeStream(MessageStream<M> stream) {
protected void removeStream(MessageStream<M> stream) {
streams.remove(stream);
}
......
......@@ -24,8 +24,8 @@ import java.util.concurrent.TimeoutException;
import static java.lang.String.format;
import static java.lang.System.out;
import static org.onlab.junit.TestTools.delay;
import static org.onlab.nio.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
......@@ -81,7 +81,7 @@ public class IOLoopTestClient {
IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
client.start();
delay(2000);
delay(500);
client.await(to);
client.report();
......
......@@ -70,7 +70,7 @@ public class IOLoopTestServer {
*
* @param args command-line arguments
*/
private static void startStandalone(String[] args) throws IOException {
public static void startStandalone(String[] args) throws IOException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
......