tom

Corrected some IO loop tests.

......@@ -7,6 +7,7 @@ import java.io.IOException;
import java.nio.channels.Selector;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.System.currentTimeMillis;
/**
* Abstraction of an I/O processing loop based on an NIO selector.
......@@ -118,4 +119,41 @@ public abstract class SelectorLoop implements Runnable {
notifyAll();
}
/**
* Waits for the loop execution to start.
*
* @param timeout number of milliseconds to wait
* @return true if loop started in time
*/
public final synchronized boolean awaitStart(long timeout) {
long max = currentTimeMillis() + timeout;
while (state != State.STARTED && (currentTimeMillis() < max)) {
try {
wait(timeout);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
return state == State.STARTED;
}
/**
* Waits for the loop execution to stop.
*
* @param timeout number of milliseconds to wait
* @return true if loop finished in time
*/
public final synchronized boolean awaitStop(long timeout) {
long max = currentTimeMillis() + timeout;
while (state != State.STOPPED && (currentTimeMillis() < max)) {
try {
wait(timeout);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
return state == State.STOPPED;
}
}
......
package org.onlab.nio;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.net.InetAddress;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.onlab.junit.TestTools.delay;
......@@ -15,55 +15,51 @@ import static org.onlab.junit.TestTools.delay;
*/
public class IOLoopIntegrationTest {
private static final int MILLION = 1000000;
private static final int TIMEOUT = 60;
private static final int THREADS = 6;
private static final int MSG_COUNT = 20 * MILLION;
private static final int MSG_SIZE = 128;
private static final int TIMEOUT = 60;
private static final int MESSAGE_LENGTH = 128;
private static final long MIN_MPS = 10 * MILLION;
private static final int MILLION = 1000000;
private static final int MSG_COUNT = 40 * MILLION;
@Before
public void warmUp() throws Exception {
Logger.getLogger("").setLevel(Level.SEVERE);
try {
run(MILLION, MSG_SIZE, 15, 0);
runTest(MILLION, MESSAGE_LENGTH, 15);
} catch (Throwable e) {
System.err.println("Failed warmup but moving on.");
e.printStackTrace();
}
}
@Ignore
@Test
public void basic() throws Exception {
run(MSG_COUNT, MSG_SIZE, TIMEOUT, MIN_MPS);
runTest(MILLION, MESSAGE_LENGTH, TIMEOUT);
}
public void longHaul() throws Exception {
runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT);
}
private void run(int count, int size, int timeout, double mps) throws Exception {
DecimalFormat f = new DecimalFormat("#,##0");
System.out.print(f.format(count * THREADS) +
(mps > 0.0 ? " messages: " : " message warm-up: "));
// Setup the test on a random port to avoid intermittent test failures
// due to the port being already bound.
int port = IOLoopServer.PORT + new Random().nextInt(100);
private void runTest(int count, int size, int timeout) throws Exception {
// Use a random port to prevent conflicts.
int port = IOLoopTestServer.PORT + new Random().nextInt(100);
InetAddress ip = InetAddress.getLoopbackAddress();
IOLoopServer sss = new IOLoopServer(ip, THREADS, size, port);
IOLoopClient ssc = new IOLoopClient(ip, THREADS, count, size, port);
IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port);
IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port);
sss.start();
ssc.start();
delay(250); // give the server and client a chance to go
server.start();
client.start();
delay(100); // Pause to allow loops to get going
ssc.await(timeout);
ssc.report();
client.await(timeout);
client.report();
delay(1000);
sss.stop();
sss.report();
server.stop();
server.report();
}
}
......
......@@ -22,15 +22,18 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.lang.String.format;
import static java.lang.System.out;
import static org.onlab.junit.TestTools.delay;
import static org.onlab.nio.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopClient {
public class IOLoopTestClient {
private static Logger log = LoggerFactory.getLogger(IOLoopClient.class);
private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
private final InetAddress ip;
private final int port;
......@@ -55,6 +58,18 @@ public class IOLoopClient {
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
startStandalone(args);
System.exit(0);
}
/**
* Starts a standalone IO loop test client.
*
* @param args command-line arguments
*/
public static void startStandalone(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
......@@ -63,15 +78,13 @@ public class IOLoopClient {
log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
wc, mc, ml, ip);
IOLoopClient sc = new IOLoopClient(ip, wc, mc, ml, IOLoopServer.PORT);
IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
sc.start();
client.start();
delay(2000);
sc.await(to);
sc.report();
System.exit(0);
client.await(to);
client.report();
}
/**
......@@ -84,7 +97,7 @@ public class IOLoopClient {
* @param port socket port
* @throws IOException if unable to create IO loops
*/
public IOLoopClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
this.ip = ip;
this.port = port;
this.msgCount = mc;
......@@ -112,8 +125,9 @@ public class IOLoopClient {
}
// Wait for all of them to get going
// for (CustomIOLoop l : iloops)
// l.waitForStart(TIMEOUT);
for (CustomIOLoop l : iloops) {
l.awaitStart(1000);
}
// ... and Next open all connections; one-per-loop
for (CustomIOLoop l : iloops) {
......@@ -162,11 +176,10 @@ public class IOLoopClient {
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
log.info("{} messages; {} bytes; {} mps; {} Mbs",
f.format(messages.total()),
f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * 128)));
out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength))));
}
......@@ -186,19 +199,20 @@ public class IOLoopClient {
}
@Override
protected synchronized void removeStream(MessageStream<TestMessage> b) {
super.removeStream(b);
messages.add(b.messagesIn().total());
bytes.add(b.bytesIn().total());
b.messagesOut().reset();
b.bytesOut().reset();
//
log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
IOLoopServer.FORMAT.format(b.messagesIn().throughput()),
IOLoopServer.FORMAT.format(b.bytesIn().throughput() / (1024 * 128)),
IOLoopServer.FORMAT.format(b.messagesOut().throughput()),
IOLoopServer.FORMAT.format(b.bytesOut().throughput() / (1024 * 128)));
protected synchronized void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
// FORMAT.format(stream.messagesIn().throughput()),
// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
// FORMAT.format(stream.messagesOut().throughput()),
// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
stream.messagesOut().reset();
stream.bytesOut().reset();
}
@Override
......
......@@ -18,15 +18,17 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.String.format;
import static java.lang.System.out;
import static org.onlab.junit.TestTools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopServer {
public class IOLoopTestServer {
private static Logger log = LoggerFactory.getLogger(IOLoopServer.class);
private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
private static final int PRUNE_FREQUENCY = 1000;
......@@ -34,8 +36,8 @@ public class IOLoopServer {
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
static final int SO_SEND_BUFFER_SIZE = 1024 * 1024;
static final int SO_RCV_BUFFER_SIZE = 1024 * 1024;
static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
......@@ -59,19 +61,29 @@ public class IOLoopServer {
* @throws IOException if unable to crate IO loops
*/
public static void main(String[] args) throws IOException {
startStandalone(args);
System.exit(0);
}
/**
* Starts a standalone IO loop test server.
*
* @param args command-line arguments
*/
private static void startStandalone(String[] args) throws IOException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
log.info("Setting up the server with {} workers, {} byte messages on {}... ",
wc, ml, ip);
IOLoopServer ss = new IOLoopServer(ip, wc, ml, PORT);
ss.start();
IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
server.start();
// Start pruning clients.
while (true) {
delay(PRUNE_FREQUENCY);
ss.prune();
server.prune();
}
}
......@@ -84,7 +96,7 @@ public class IOLoopServer {
* @param port listen port
* @throws IOException if unable to create IO loops
*/
public IOLoopServer(InetAddress ip, int wc, int ml, int port) throws IOException {
public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
this.msgLength = ml;
this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
......@@ -107,9 +119,10 @@ public class IOLoopServer {
}
apool.execute(aloop);
// for (CustomIOLoop l : iloops)
// l.waitForStart(TIMEOUT);
// aloop.waitForStart(TIMEOUT);
for (CustomIOLoop l : iloops) {
l.awaitStart(TIMEOUT);
}
aloop.awaitStart(TIMEOUT);
}
/**
......@@ -121,10 +134,11 @@ public class IOLoopServer {
l.shutdown();
}
// for (CustomIOLoop l : iloops)
// l.waitForFinish(TIMEOUT);
// aloop.waitForFinish(TIMEOUT);
//
for (CustomIOLoop l : iloops) {
l.awaitStop(TIMEOUT);
}
aloop.awaitStop(TIMEOUT);
messages.freeze();
bytes.freeze();
}
......@@ -134,11 +148,10 @@ public class IOLoopServer {
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
log.info("{} messages; {} bytes; {} mps; {} Mbs",
f.format(messages.total()),
f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * 128)));
out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength))));
}
/**
......@@ -175,11 +188,11 @@ public class IOLoopServer {
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
FORMAT.format(stream.messagesIn().throughput()),
FORMAT.format(stream.bytesIn().throughput() / (1024 * 128)),
FORMAT.format(stream.messagesOut().throughput()),
FORMAT.format(stream.bytesOut().throughput() / (1024 * 128)));
// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
// FORMAT.format(stream.messagesIn().throughput()),
// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
// FORMAT.format(stream.messagesOut().throughput()),
// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
}
@Override
......