tom

Working on IO loop tests commands.

...@@ -298,7 +298,7 @@ public class IOLoopTestClient { ...@@ -298,7 +298,7 @@ public class IOLoopTestClient {
298 List<TestMessage> batch = Lists.newArrayListWithCapacity(size); 298 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
299 for (int i = 0; i < size; i++) { 299 for (int i = 0; i < size; i++) {
300 batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, 300 batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
301 - this.stream.padding(msgLength))); 301 + stream.padding()));
302 } 302 }
303 acquire(size); 303 acquire(size);
304 stream.write(batch); 304 stream.write(batch);
......
...@@ -85,10 +85,12 @@ public class IOLoopTestServer { ...@@ -85,10 +85,12 @@ public class IOLoopTestServer {
85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); 85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
86 server.start(); 86 server.start();
87 87
88 - // Start pruning clients. 88 + // Start pruning clients and keep going until their number goes to 0.
89 - while (true) { 89 + int remaining = -1;
90 + while (remaining == -1 || remaining > 0) {
90 delay(PRUNE_FREQUENCY); 91 delay(PRUNE_FREQUENCY);
91 - server.prune(); 92 + int r = server.prune();
93 + remaining = remaining == -1 && r == 0 ? remaining : r;
92 } 94 }
93 } 95 }
94 96
...@@ -161,11 +163,15 @@ public class IOLoopTestServer { ...@@ -161,11 +163,15 @@ public class IOLoopTestServer {
161 163
162 /** 164 /**
163 * Prunes the IO loops of stale message buffers. 165 * Prunes the IO loops of stale message buffers.
166 + *
167 + * @return number of remaining IO loops among all workers.
164 */ 168 */
165 - public void prune() { 169 + public int prune() {
170 + int count = 0;
166 for (CustomIOLoop l : iloops) { 171 for (CustomIOLoop l : iloops) {
167 - l.pruneStaleStreams(); 172 + count += l.pruneStaleStreams();
168 } 173 }
174 + return count;
169 } 175 }
170 176
171 // Get the next worker to which a client should be assigned 177 // Get the next worker to which a client should be assigned
......
1 package org.onlab.onos.foo; 1 package org.onlab.onos.foo;
2 2
3 +import org.apache.karaf.shell.commands.Argument;
3 import org.apache.karaf.shell.commands.Command; 4 import org.apache.karaf.shell.commands.Command;
4 import org.onlab.onos.cli.AbstractShellCommand; 5 import org.onlab.onos.cli.AbstractShellCommand;
5 6
...@@ -12,12 +13,32 @@ import static org.onlab.onos.foo.IOLoopTestClient.startStandalone; ...@@ -12,12 +13,32 @@ import static org.onlab.onos.foo.IOLoopTestClient.startStandalone;
12 description = "Starts the test IO loop client") 13 description = "Starts the test IO loop client")
13 public class TestIOClientCommand extends AbstractShellCommand { 14 public class TestIOClientCommand extends AbstractShellCommand {
14 15
16 + @Argument(index = 0, name = "serverIp", description = "Server IP address",
17 + required = false, multiValued = false)
18 + String serverIp = "127.0.0.1";
19 +
20 + @Argument(index = 1, name = "workers", description = "IO workers",
21 + required = false, multiValued = false)
22 + String workers = "6";
23 +
24 + @Argument(index = 2, name = "messageCount", description = "Message count",
25 + required = false, multiValued = false)
26 + String messageCount = "10000000";
27 +
28 + @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
29 + required = false, multiValued = false)
30 + String messageLength = "128";
31 +
32 + @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
33 + required = false, multiValued = false)
34 + String timeoutSecs = "30";
35 +
15 @Override 36 @Override
16 protected void execute() { 37 protected void execute() {
17 try { 38 try {
18 - startStandalone(new String[]{}); 39 + startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
19 } catch (Exception e) { 40 } catch (Exception e) {
20 - error("Unable to start server %s", e); 41 + error("Unable to start client %s", e);
21 } 42 }
22 } 43 }
23 44
......
1 package org.onlab.onos.foo; 1 package org.onlab.onos.foo;
2 2
3 +import org.apache.karaf.shell.commands.Argument;
3 import org.apache.karaf.shell.commands.Command; 4 import org.apache.karaf.shell.commands.Command;
4 import org.onlab.onos.cli.AbstractShellCommand; 5 import org.onlab.onos.cli.AbstractShellCommand;
5 6
6 import static org.onlab.onos.foo.IOLoopTestServer.startStandalone; 7 import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
7 8
8 -
9 /** 9 /**
10 * Starts the test IO loop server. 10 * Starts the test IO loop server.
11 */ 11 */
...@@ -13,10 +13,22 @@ import static org.onlab.onos.foo.IOLoopTestServer.startStandalone; ...@@ -13,10 +13,22 @@ import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
13 description = "Starts the test IO loop server") 13 description = "Starts the test IO loop server")
14 public class TestIOServerCommand extends AbstractShellCommand { 14 public class TestIOServerCommand extends AbstractShellCommand {
15 15
16 + @Argument(index = 0, name = "serverIp", description = "Server IP address",
17 + required = false, multiValued = false)
18 + String serverIp = "127.0.0.1";
19 +
20 + @Argument(index = 1, name = "workers", description = "IO workers",
21 + required = false, multiValued = false)
22 + String workers = "6";
23 +
24 + @Argument(index = 2, name = "messageLength", description = "Message length (bytes)",
25 + required = false, multiValued = false)
26 + String messageLength = "128";
27 +
16 @Override 28 @Override
17 protected void execute() { 29 protected void execute() {
18 try { 30 try {
19 - startStandalone(new String[]{}); 31 + startStandalone(new String[]{serverIp, workers, messageLength});
20 } catch (Exception e) { 32 } catch (Exception e) {
21 error("Unable to start server %s", e); 33 error("Unable to start server %s", e);
22 } 34 }
......
...@@ -6,6 +6,7 @@ import org.onlab.nio.MessageStream; ...@@ -6,6 +6,7 @@ import org.onlab.nio.MessageStream;
6 import java.nio.ByteBuffer; 6 import java.nio.ByteBuffer;
7 import java.nio.channels.ByteChannel; 7 import java.nio.channels.ByteChannel;
8 8
9 +import static com.google.common.base.Preconditions.checkArgument;
9 import static com.google.common.base.Preconditions.checkState; 10 import static com.google.common.base.Preconditions.checkState;
10 11
11 /** 12 /**
...@@ -19,12 +20,18 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -19,12 +20,18 @@ public class TestMessageStream extends MessageStream<TestMessage> {
19 private static final int META_LENGTH = 40; 20 private static final int META_LENGTH = 40;
20 21
21 private final int length; 22 private final int length;
23 + private boolean isStrict = true;
22 24
23 public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) { 25 public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
24 super(loop, ch, 64 * 1024, 500); 26 super(loop, ch, 64 * 1024, 500);
27 + checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
25 this.length = length; 28 this.length = length;
26 } 29 }
27 30
31 + void setNonStrict() {
32 + isStrict = false;
33 + }
34 +
28 @Override 35 @Override
29 protected TestMessage read(ByteBuffer rb) { 36 protected TestMessage read(ByteBuffer rb) {
30 if (rb.remaining() < length) { 37 if (rb.remaining() < length) {
...@@ -32,16 +39,20 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -32,16 +39,20 @@ public class TestMessageStream extends MessageStream<TestMessage> {
32 } 39 }
33 40
34 long startTag = rb.getLong(); 41 long startTag = rb.getLong();
35 - checkState(startTag == START_TAG, "Incorrect message start"); 42 + if (isStrict) {
43 + checkState(startTag == START_TAG, "Incorrect message start");
44 + }
36 45
37 long size = rb.getLong(); 46 long size = rb.getLong();
38 long requestorTime = rb.getLong(); 47 long requestorTime = rb.getLong();
39 long responderTime = rb.getLong(); 48 long responderTime = rb.getLong();
40 - byte[] padding = padding(length); 49 + byte[] padding = padding();
41 rb.get(padding); 50 rb.get(padding);
42 51
43 long endTag = rb.getLong(); 52 long endTag = rb.getLong();
44 - checkState(endTag == END_TAG, "Incorrect message end"); 53 + if (isStrict) {
54 + checkState(endTag == END_TAG, "Incorrect message end");
55 + }
45 56
46 return new TestMessage((int) size, requestorTime, responderTime, padding); 57 return new TestMessage((int) size, requestorTime, responderTime, padding);
47 } 58 }
...@@ -60,7 +71,7 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -60,7 +71,7 @@ public class TestMessageStream extends MessageStream<TestMessage> {
60 wb.putLong(END_TAG); 71 wb.putLong(END_TAG);
61 } 72 }
62 73
63 - public byte[] padding(int msgLength) { 74 + public byte[] padding() {
64 - return new byte[msgLength - META_LENGTH]; 75 + return new byte[length - META_LENGTH];
65 } 76 }
66 } 77 }
......
...@@ -259,13 +259,16 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -259,13 +259,16 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
259 259
260 /** 260 /**
261 * Prunes the registered streams by discarding any stale ones. 261 * Prunes the registered streams by discarding any stale ones.
262 + *
263 + * @return number of remaining streams
262 */ 264 */
263 - public synchronized void pruneStaleStreams() { 265 + public synchronized int pruneStaleStreams() {
264 for (MessageStream<M> stream : streams) { 266 for (MessageStream<M> stream : streams) {
265 if (stream.isStale()) { 267 if (stream.isStale()) {
266 stream.close(); 268 stream.close();
267 } 269 }
268 } 270 }
271 + return streams.size();
269 } 272 }
270 273
271 } 274 }
......