tom

Added ability to measure round-trip latency and to assure message integrity.

1 package org.onlab.onos.foo; 1 package org.onlab.onos.foo;
2 2
3 +import com.google.common.collect.Lists;
3 import org.onlab.nio.IOLoop; 4 import org.onlab.nio.IOLoop;
4 import org.onlab.nio.MessageStream; 5 import org.onlab.nio.MessageStream;
5 import org.onlab.util.Counter; 6 import org.onlab.util.Counter;
...@@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit; ...@@ -25,6 +26,7 @@ import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException; 26 import java.util.concurrent.TimeoutException;
26 27
27 import static java.lang.String.format; 28 import static java.lang.String.format;
29 +import static java.lang.System.currentTimeMillis;
28 import static java.lang.System.out; 30 import static java.lang.System.out;
29 import static org.onlab.onos.foo.IOLoopTestServer.PORT; 31 import static org.onlab.onos.foo.IOLoopTestServer.PORT;
30 import static org.onlab.util.Tools.delay; 32 import static org.onlab.util.Tools.delay;
...@@ -48,15 +50,18 @@ public class IOLoopTestClient { ...@@ -48,15 +50,18 @@ public class IOLoopTestClient {
48 50
49 Counter messages; 51 Counter messages;
50 Counter bytes; 52 Counter bytes;
53 + long latencyTotal = 0;
54 + long latencyCount = 0;
55 +
51 56
52 /** 57 /**
53 * Main entry point to launch the client. 58 * Main entry point to launch the client.
54 * 59 *
55 * @param args command-line arguments 60 * @param args command-line arguments
56 - * @throws java.io.IOException if unable to connect to server 61 + * @throws java.io.IOException if unable to connect to server
57 - * @throws InterruptedException if latch wait gets interrupted 62 + * @throws InterruptedException if latch wait gets interrupted
58 - * @throws java.util.concurrent.ExecutionException if wait gets interrupted 63 + * @throws java.util.concurrent.ExecutionException if wait gets interrupted
59 - * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion 64 + * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
60 */ 65 */
61 public static void main(String[] args) 66 public static void main(String[] args)
62 throws IOException, InterruptedException, ExecutionException, TimeoutException { 67 throws IOException, InterruptedException, ExecutionException, TimeoutException {
...@@ -158,15 +163,17 @@ public class IOLoopTestClient { ...@@ -158,15 +163,17 @@ public class IOLoopTestClient {
158 * Waits for the client workers to complete. 163 * Waits for the client workers to complete.
159 * 164 *
160 * @param secs timeout in seconds 165 * @param secs timeout in seconds
161 - * @throws java.util.concurrent.ExecutionException if execution failed 166 + * @throws java.util.concurrent.ExecutionException if execution failed
162 - * @throws InterruptedException if interrupt occurred while waiting 167 + * @throws InterruptedException if interrupt occurred while waiting
163 - * @throws java.util.concurrent.TimeoutException if timeout occurred 168 + * @throws java.util.concurrent.TimeoutException if timeout occurred
164 */ 169 */
165 public void await(int secs) throws InterruptedException, 170 public void await(int secs) throws InterruptedException,
166 ExecutionException, TimeoutException { 171 ExecutionException, TimeoutException {
167 for (CustomIOLoop l : iloops) { 172 for (CustomIOLoop l : iloops) {
168 if (l.worker.task != null) { 173 if (l.worker.task != null) {
169 l.worker.task.get(secs, TimeUnit.SECONDS); 174 l.worker.task.get(secs, TimeUnit.SECONDS);
175 + latencyTotal += l.latencyTotal;
176 + latencyCount += l.latencyCount;
170 } 177 }
171 } 178 }
172 messages.freeze(); 179 messages.freeze();
...@@ -178,10 +185,11 @@ public class IOLoopTestClient { ...@@ -178,10 +185,11 @@ public class IOLoopTestClient {
178 */ 185 */
179 public void report() { 186 public void report() {
180 DecimalFormat f = new DecimalFormat("#,##0"); 187 DecimalFormat f = new DecimalFormat("#,##0");
181 - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs", 188 + out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
182 f.format(messages.total()), f.format(bytes.total()), 189 f.format(messages.total()), f.format(bytes.total()),
183 f.format(messages.throughput()), 190 f.format(messages.throughput()),
184 - f.format(bytes.throughput() / (1024 * msgLength)))); 191 + f.format(bytes.throughput() / (1024 * msgLength)),
192 + f.format(latencyTotal / latencyCount)));
185 } 193 }
186 194
187 195
...@@ -189,6 +197,9 @@ public class IOLoopTestClient { ...@@ -189,6 +197,9 @@ public class IOLoopTestClient {
189 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> { 197 private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
190 198
191 Worker worker = new Worker(); 199 Worker worker = new Worker();
200 + long latencyTotal = 0;
201 + long latencyCount = 0;
202 +
192 203
193 public CustomIOLoop() throws IOException { 204 public CustomIOLoop() throws IOException {
194 super(500); 205 super(500);
...@@ -219,7 +230,12 @@ public class IOLoopTestClient { ...@@ -219,7 +230,12 @@ public class IOLoopTestClient {
219 230
220 @Override 231 @Override
221 protected void processMessages(List<TestMessage> messages, 232 protected void processMessages(List<TestMessage> messages,
222 - MessageStream<TestMessage> b) { 233 + MessageStream<TestMessage> stream) {
234 + for (TestMessage message : messages) {
235 + // TODO: summarize latency data better
236 + latencyTotal += currentTimeMillis() - message.requestorTime();
237 + latencyCount++;
238 + }
223 worker.release(messages.size()); 239 worker.release(messages.size());
224 } 240 }
225 241
...@@ -241,15 +257,15 @@ public class IOLoopTestClient { ...@@ -241,15 +257,15 @@ public class IOLoopTestClient {
241 private static final int BATCH_SIZE = 1000; 257 private static final int BATCH_SIZE = 1000;
242 private static final int PERMITS = 2 * BATCH_SIZE; 258 private static final int PERMITS = 2 * BATCH_SIZE;
243 259
244 - private TestMessageStream b; 260 + private TestMessageStream stream;
245 private FutureTask<Worker> task; 261 private FutureTask<Worker> task;
246 262
247 // Stuff to throttle pump 263 // Stuff to throttle pump
248 private final Semaphore semaphore = new Semaphore(PERMITS); 264 private final Semaphore semaphore = new Semaphore(PERMITS);
249 private int msgWritten; 265 private int msgWritten;
250 266
251 - void pump(TestMessageStream b) { 267 + void pump(TestMessageStream stream) {
252 - this.b = b; 268 + this.stream = stream;
253 task = new FutureTask<>(this, this); 269 task = new FutureTask<>(this, this);
254 wpool.execute(task); 270 wpool.execute(task);
255 } 271 }
...@@ -259,18 +275,15 @@ public class IOLoopTestClient { ...@@ -259,18 +275,15 @@ public class IOLoopTestClient {
259 try { 275 try {
260 log.info("Worker started..."); 276 log.info("Worker started...");
261 277
262 - List<TestMessage> batch = new ArrayList<>();
263 - for (int i = 0; i < BATCH_SIZE; i++) {
264 - batch.add(new TestMessage(msgLength));
265 - }
266 -
267 while (msgWritten < msgCount) { 278 while (msgWritten < msgCount) {
268 - msgWritten += writeBatch(b, batch); 279 + int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
280 + writeBatch(size);
281 + msgWritten += size;
269 } 282 }
270 283
271 // Now try to get all the permits back before sending poison pill 284 // Now try to get all the permits back before sending poison pill
272 semaphore.acquireUninterruptibly(PERMITS); 285 semaphore.acquireUninterruptibly(PERMITS);
273 - b.close(); 286 + stream.close();
274 287
275 log.info("Worker done..."); 288 log.info("Worker done...");
276 289
...@@ -280,18 +293,15 @@ public class IOLoopTestClient { ...@@ -280,18 +293,15 @@ public class IOLoopTestClient {
280 } 293 }
281 294
282 295
283 - private int writeBatch(TestMessageStream b, List<TestMessage> batch) 296 + private void writeBatch(int size) throws IOException {
284 - throws IOException { 297 + // Build a batch of messages
285 - int count = Math.min(BATCH_SIZE, msgCount - msgWritten); 298 + List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
286 - acquire(count); 299 + for (int i = 0; i < size; i++) {
287 - if (count == BATCH_SIZE) { 300 + batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
288 - b.write(batch); 301 + this.stream.padding(msgLength)));
289 - } else {
290 - for (int i = 0; i < count; i++) {
291 - b.write(batch.get(i));
292 - }
293 } 302 }
294 - return count; 303 + acquire(size);
304 + stream.write(batch);
295 } 305 }
296 306
297 307
......
1 package org.onlab.onos.foo; 1 package org.onlab.onos.foo;
2 2
3 +import com.google.common.collect.Lists;
3 import org.onlab.nio.AcceptorLoop; 4 import org.onlab.nio.AcceptorLoop;
4 import org.onlab.nio.IOLoop; 5 import org.onlab.nio.IOLoop;
5 import org.onlab.nio.MessageStream; 6 import org.onlab.nio.MessageStream;
...@@ -22,6 +23,7 @@ import java.util.concurrent.ExecutorService; ...@@ -22,6 +23,7 @@ import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors; 23 import java.util.concurrent.Executors;
23 24
24 import static java.lang.String.format; 25 import static java.lang.String.format;
26 +import static java.lang.System.currentTimeMillis;
25 import static java.lang.System.out; 27 import static java.lang.System.out;
26 import static org.onlab.util.Tools.delay; 28 import static org.onlab.util.Tools.delay;
27 import static org.onlab.util.Tools.namedThreads; 29 import static org.onlab.util.Tools.namedThreads;
...@@ -202,11 +204,20 @@ public class IOLoopTestServer { ...@@ -202,11 +204,20 @@ public class IOLoopTestServer {
202 protected void processMessages(List<TestMessage> messages, 204 protected void processMessages(List<TestMessage> messages,
203 MessageStream<TestMessage> stream) { 205 MessageStream<TestMessage> stream) {
204 try { 206 try {
205 - stream.write(messages); 207 + stream.write(createResponses(messages));
206 } catch (IOException e) { 208 } catch (IOException e) {
207 log.error("Unable to echo messages", e); 209 log.error("Unable to echo messages", e);
208 } 210 }
209 } 211 }
212 +
213 + private List<TestMessage> createResponses(List<TestMessage> messages) {
214 + List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
215 + for (TestMessage message : messages) {
216 + responses.add(new TestMessage(message.length(), message.requestorTime(),
217 + currentTimeMillis(), message.padding()));
218 + }
219 + return responses;
220 + }
210 } 221 }
211 222
212 // Loop for accepting client connections 223 // Loop for accepting client connections
......
...@@ -2,40 +2,42 @@ package org.onlab.onos.foo; ...@@ -2,40 +2,42 @@ package org.onlab.onos.foo;
2 2
3 import org.onlab.nio.AbstractMessage; 3 import org.onlab.nio.AbstractMessage;
4 4
5 +import static com.google.common.base.Preconditions.checkNotNull;
6 +
5 /** 7 /**
6 - * Fixed-length message. 8 + * Test message for measuring rate and round-trip latency.
7 */ 9 */
8 public class TestMessage extends AbstractMessage { 10 public class TestMessage extends AbstractMessage {
9 11
10 - private final byte[] data; 12 + private final byte[] padding;
13 +
14 + private final long requestorTime;
15 + private final long responderTime;
11 16
12 /** 17 /**
13 - * Creates a new message with the specified length. 18 + * Creates a new message with the specified data.
14 * 19 *
15 - * @param length message length 20 + * @param requestorTime requester time
21 + * @param responderTime responder time
22 + * @param padding message padding
16 */ 23 */
17 - public TestMessage(int length) { 24 + TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
18 this.length = length; 25 this.length = length;
19 - data = new byte[length]; 26 + this.requestorTime = requestorTime;
27 + this.responderTime = responderTime;
28 + this.padding = checkNotNull(padding, "Padding cannot be null");
20 } 29 }
21 30
22 - /** 31 + public long requestorTime() {
23 - * Creates a new message with the specified data. 32 + return requestorTime;
24 - *
25 - * @param data message data
26 - */
27 - TestMessage(byte[] data) {
28 - this.length = data.length;
29 - this.data = data;
30 } 33 }
31 34
32 - /** 35 + public long responderTime() {
33 - * Gets the backing byte array data. 36 + return responderTime;
34 - * 37 + }
35 - * @return backing byte array 38 +
36 - */ 39 + public byte[] padding() {
37 - public byte[] data() { 40 + return padding;
38 - return data;
39 } 41 }
40 42
41 } 43 }
......
...@@ -6,24 +6,21 @@ import org.onlab.nio.MessageStream; ...@@ -6,24 +6,21 @@ import org.onlab.nio.MessageStream;
6 import java.nio.ByteBuffer; 6 import java.nio.ByteBuffer;
7 import java.nio.channels.ByteChannel; 7 import java.nio.channels.ByteChannel;
8 8
9 +import static com.google.common.base.Preconditions.checkState;
10 +
9 /** 11 /**
10 * Fixed-length message transfer buffer. 12 * Fixed-length message transfer buffer.
11 */ 13 */
12 public class TestMessageStream extends MessageStream<TestMessage> { 14 public class TestMessageStream extends MessageStream<TestMessage> {
13 15
14 private static final String E_WRONG_LEN = "Illegal message length: "; 16 private static final String E_WRONG_LEN = "Illegal message length: ";
17 + private static final long START_TAG = 0xfeedcafedeaddeedL;
18 + private static final long END_TAG = 0xbeadcafedeaddeedL;
19 + private static final int META_LENGTH = 40;
15 20
16 private final int length; 21 private final int length;
17 22
18 - /** 23 + public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
19 - * Create a new buffer for transferring messages of the specified length.
20 - *
21 - * @param length message length
22 - * @param ch backing channel
23 - * @param loop driver loop
24 - */
25 - public TestMessageStream(int length, ByteChannel ch,
26 - IOLoop<TestMessage, ?> loop) {
27 super(loop, ch, 64 * 1024, 500); 24 super(loop, ch, 64 * 1024, 500);
28 this.length = length; 25 this.length = length;
29 } 26 }
...@@ -33,26 +30,37 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -33,26 +30,37 @@ public class TestMessageStream extends MessageStream<TestMessage> {
33 if (rb.remaining() < length) { 30 if (rb.remaining() < length) {
34 return null; 31 return null;
35 } 32 }
36 - TestMessage message = new TestMessage(length); 33 +
37 - rb.get(message.data()); 34 + long startTag = rb.getLong();
38 - return message; 35 + checkState(startTag == START_TAG, "Incorrect message start");
36 +
37 + long size = rb.getLong();
38 + long requestorTime = rb.getLong();
39 + long responderTime = rb.getLong();
40 + byte[] padding = padding(length);
41 + rb.get(padding);
42 +
43 + long endTag = rb.getLong();
44 + checkState(endTag == END_TAG, "Incorrect message end");
45 +
46 + return new TestMessage((int) size, requestorTime, responderTime, padding);
39 } 47 }
40 48
41 - /**
42 - * {@inheritDoc}
43 - * <p/>
44 - * This implementation enforces the message length against the buffer
45 - * supported length.
46 - *
47 - * @throws IllegalArgumentException if message size does not match the
48 - * supported buffer size
49 - */
50 @Override 49 @Override
51 protected void write(TestMessage message, ByteBuffer wb) { 50 protected void write(TestMessage message, ByteBuffer wb) {
52 if (message.length() != length) { 51 if (message.length() != length) {
53 throw new IllegalArgumentException(E_WRONG_LEN + message.length()); 52 throw new IllegalArgumentException(E_WRONG_LEN + message.length());
54 } 53 }
55 - wb.put(message.data()); 54 +
55 + wb.putLong(START_TAG);
56 + wb.putLong(message.length());
57 + wb.putLong(message.requestorTime());
58 + wb.putLong(message.responderTime());
59 + wb.put(message.padding(), 0, length - META_LENGTH);
60 + wb.putLong(END_TAG);
56 } 61 }
57 62
63 + public byte[] padding(int msgLength) {
64 + return new byte[msgLength - META_LENGTH];
65 + }
58 } 66 }
......