alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

...@@ -28,6 +28,10 @@ ...@@ -28,6 +28,10 @@
28 <version>${project.version}</version> 28 <version>${project.version}</version>
29 </dependency> 29 </dependency>
30 <dependency> 30 <dependency>
31 + <groupId>org.livetribe.slp</groupId>
32 + <artifactId>livetribe-slp</artifactId>
33 + </dependency>
34 + <dependency>
31 <groupId>org.apache.karaf.shell</groupId> 35 <groupId>org.apache.karaf.shell</groupId>
32 <artifactId>org.apache.karaf.shell.console</artifactId> 36 <artifactId>org.apache.karaf.shell.console</artifactId>
33 </dependency> 37 </dependency>
......
...@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit; ...@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException; 26 import java.util.concurrent.TimeoutException;
27 27
28 import static java.lang.String.format; 28 import static java.lang.String.format;
29 -import static java.lang.System.currentTimeMillis; 29 +import static java.lang.System.nanoTime;
30 import static java.lang.System.out; 30 import static java.lang.System.out;
31 import static org.onlab.onos.foo.IOLoopTestServer.PORT; 31 import static org.onlab.onos.foo.IOLoopTestServer.PORT;
32 import static org.onlab.util.Tools.delay; 32 import static org.onlab.util.Tools.delay;
...@@ -81,7 +81,7 @@ public class IOLoopTestClient { ...@@ -81,7 +81,7 @@ public class IOLoopTestClient {
81 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; 81 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
82 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000; 82 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
83 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128; 83 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
84 - int to = args.length > 4 ? Integer.parseInt(args[4]) : 30; 84 + int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
85 85
86 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", 86 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
87 wc, mc, ml, ip); 87 wc, mc, ml, ip);
...@@ -185,7 +185,7 @@ public class IOLoopTestClient { ...@@ -185,7 +185,7 @@ public class IOLoopTestClient {
185 */ 185 */
186 public void report() { 186 public void report() {
187 DecimalFormat f = new DecimalFormat("#,##0"); 187 DecimalFormat f = new DecimalFormat("#,##0");
188 - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency", 188 + out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
189 f.format(messages.total()), f.format(bytes.total()), 189 f.format(messages.total()), f.format(bytes.total()),
190 f.format(messages.throughput()), 190 f.format(messages.throughput()),
191 f.format(bytes.throughput() / (1024 * msgLength)), 191 f.format(bytes.throughput() / (1024 * msgLength)),
...@@ -217,13 +217,6 @@ public class IOLoopTestClient { ...@@ -217,13 +217,6 @@ public class IOLoopTestClient {
217 217
218 messages.add(stream.messagesIn().total()); 218 messages.add(stream.messagesIn().total());
219 bytes.add(stream.bytesIn().total()); 219 bytes.add(stream.bytesIn().total());
220 -
221 -// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
222 -// FORMAT.format(stream.messagesIn().throughput()),
223 -// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
224 -// FORMAT.format(stream.messagesOut().throughput()),
225 -// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
226 -
227 stream.messagesOut().reset(); 220 stream.messagesOut().reset();
228 stream.bytesOut().reset(); 221 stream.bytesOut().reset();
229 } 222 }
...@@ -233,7 +226,7 @@ public class IOLoopTestClient { ...@@ -233,7 +226,7 @@ public class IOLoopTestClient {
233 MessageStream<TestMessage> stream) { 226 MessageStream<TestMessage> stream) {
234 for (TestMessage message : messages) { 227 for (TestMessage message : messages) {
235 // TODO: summarize latency data better 228 // TODO: summarize latency data better
236 - latencyTotal += currentTimeMillis() - message.requestorTime(); 229 + latencyTotal += nanoTime() - message.requestorTime();
237 latencyCount++; 230 latencyCount++;
238 } 231 }
239 worker.release(messages.size()); 232 worker.release(messages.size());
...@@ -254,7 +247,7 @@ public class IOLoopTestClient { ...@@ -254,7 +247,7 @@ public class IOLoopTestClient {
254 */ 247 */
255 private class Worker implements Runnable { 248 private class Worker implements Runnable {
256 249
257 - private static final int BATCH_SIZE = 1000; 250 + private static final int BATCH_SIZE = 10;
258 private static final int PERMITS = 2 * BATCH_SIZE; 251 private static final int PERMITS = 2 * BATCH_SIZE;
259 252
260 private TestMessageStream stream; 253 private TestMessageStream stream;
...@@ -297,8 +290,8 @@ public class IOLoopTestClient { ...@@ -297,8 +290,8 @@ public class IOLoopTestClient {
297 // Build a batch of messages 290 // Build a batch of messages
298 List<TestMessage> batch = Lists.newArrayListWithCapacity(size); 291 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
299 for (int i = 0; i < size; i++) { 292 for (int i = 0; i < size; i++) {
300 - batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, 293 + batch.add(new TestMessage(msgLength, nanoTime(), 0,
301 - this.stream.padding(msgLength))); 294 + stream.padding()));
302 } 295 }
303 acquire(size); 296 acquire(size);
304 stream.write(batch); 297 stream.write(batch);
......
...@@ -23,7 +23,7 @@ import java.util.concurrent.ExecutorService; ...@@ -23,7 +23,7 @@ import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors; 23 import java.util.concurrent.Executors;
24 24
25 import static java.lang.String.format; 25 import static java.lang.String.format;
26 -import static java.lang.System.currentTimeMillis; 26 +import static java.lang.System.nanoTime;
27 import static java.lang.System.out; 27 import static java.lang.System.out;
28 import static org.onlab.util.Tools.delay; 28 import static org.onlab.util.Tools.delay;
29 import static org.onlab.util.Tools.namedThreads; 29 import static org.onlab.util.Tools.namedThreads;
...@@ -85,11 +85,14 @@ public class IOLoopTestServer { ...@@ -85,11 +85,14 @@ public class IOLoopTestServer {
85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); 85 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
86 server.start(); 86 server.start();
87 87
88 - // Start pruning clients. 88 + // Start pruning clients and keep going until their number goes to 0.
89 - while (true) { 89 + int remaining = -1;
90 + while (remaining == -1 || remaining > 0) {
90 delay(PRUNE_FREQUENCY); 91 delay(PRUNE_FREQUENCY);
91 - server.prune(); 92 + int r = server.prune();
93 + remaining = remaining == -1 && r == 0 ? remaining : r;
92 } 94 }
95 + server.stop();
93 } 96 }
94 97
95 /** 98 /**
...@@ -153,7 +156,7 @@ public class IOLoopTestServer { ...@@ -153,7 +156,7 @@ public class IOLoopTestServer {
153 */ 156 */
154 public void report() { 157 public void report() {
155 DecimalFormat f = new DecimalFormat("#,##0"); 158 DecimalFormat f = new DecimalFormat("#,##0");
156 - out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs", 159 + out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
157 f.format(messages.total()), f.format(bytes.total()), 160 f.format(messages.total()), f.format(bytes.total()),
158 f.format(messages.throughput()), 161 f.format(messages.throughput()),
159 f.format(bytes.throughput() / (1024 * msgLength)))); 162 f.format(bytes.throughput() / (1024 * msgLength))));
...@@ -161,11 +164,15 @@ public class IOLoopTestServer { ...@@ -161,11 +164,15 @@ public class IOLoopTestServer {
161 164
162 /** 165 /**
163 * Prunes the IO loops of stale message buffers. 166 * Prunes the IO loops of stale message buffers.
167 + *
168 + * @return number of remaining IO loops among all workers.
164 */ 169 */
165 - public void prune() { 170 + public int prune() {
171 + int count = 0;
166 for (CustomIOLoop l : iloops) { 172 for (CustomIOLoop l : iloops) {
167 - l.pruneStaleStreams(); 173 + count += l.pruneStaleStreams();
168 } 174 }
175 + return count;
169 } 176 }
170 177
171 // Get the next worker to which a client should be assigned 178 // Get the next worker to which a client should be assigned
...@@ -189,15 +196,8 @@ public class IOLoopTestServer { ...@@ -189,15 +196,8 @@ public class IOLoopTestServer {
189 @Override 196 @Override
190 protected void removeStream(MessageStream<TestMessage> stream) { 197 protected void removeStream(MessageStream<TestMessage> stream) {
191 super.removeStream(stream); 198 super.removeStream(stream);
192 -
193 messages.add(stream.messagesIn().total()); 199 messages.add(stream.messagesIn().total());
194 bytes.add(stream.bytesIn().total()); 200 bytes.add(stream.bytesIn().total());
195 -
196 -// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
197 -// FORMAT.format(stream.messagesIn().throughput()),
198 -// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
199 -// FORMAT.format(stream.messagesOut().throughput()),
200 -// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
201 } 201 }
202 202
203 @Override 203 @Override
...@@ -214,7 +214,7 @@ public class IOLoopTestServer { ...@@ -214,7 +214,7 @@ public class IOLoopTestServer {
214 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); 214 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
215 for (TestMessage message : messages) { 215 for (TestMessage message : messages) {
216 responses.add(new TestMessage(message.length(), message.requestorTime(), 216 responses.add(new TestMessage(message.length(), message.requestorTime(),
217 - currentTimeMillis(), message.padding())); 217 + nanoTime(), message.padding()));
218 } 218 }
219 return responses; 219 return responses;
220 } 220 }
......
1 package org.onlab.onos.foo; 1 package org.onlab.onos.foo;
2 2
3 +import org.apache.karaf.shell.commands.Argument;
3 import org.apache.karaf.shell.commands.Command; 4 import org.apache.karaf.shell.commands.Command;
4 import org.onlab.onos.cli.AbstractShellCommand; 5 import org.onlab.onos.cli.AbstractShellCommand;
5 6
...@@ -12,12 +13,32 @@ import static org.onlab.onos.foo.IOLoopTestClient.startStandalone; ...@@ -12,12 +13,32 @@ import static org.onlab.onos.foo.IOLoopTestClient.startStandalone;
12 description = "Starts the test IO loop client") 13 description = "Starts the test IO loop client")
13 public class TestIOClientCommand extends AbstractShellCommand { 14 public class TestIOClientCommand extends AbstractShellCommand {
14 15
16 + @Argument(index = 0, name = "serverIp", description = "Server IP address",
17 + required = false, multiValued = false)
18 + String serverIp = "127.0.0.1";
19 +
20 + @Argument(index = 1, name = "workers", description = "IO workers",
21 + required = false, multiValued = false)
22 + String workers = "6";
23 +
24 + @Argument(index = 2, name = "messageCount", description = "Message count",
25 + required = false, multiValued = false)
26 + String messageCount = "1000000";
27 +
28 + @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
29 + required = false, multiValued = false)
30 + String messageLength = "128";
31 +
32 + @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
33 + required = false, multiValued = false)
34 + String timeoutSecs = "60";
35 +
15 @Override 36 @Override
16 protected void execute() { 37 protected void execute() {
17 try { 38 try {
18 - startStandalone(new String[]{}); 39 + startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
19 } catch (Exception e) { 40 } catch (Exception e) {
20 - error("Unable to start server %s", e); 41 + error("Unable to start client %s", e);
21 } 42 }
22 } 43 }
23 44
......
1 package org.onlab.onos.foo; 1 package org.onlab.onos.foo;
2 2
3 +import org.apache.karaf.shell.commands.Argument;
3 import org.apache.karaf.shell.commands.Command; 4 import org.apache.karaf.shell.commands.Command;
4 import org.onlab.onos.cli.AbstractShellCommand; 5 import org.onlab.onos.cli.AbstractShellCommand;
5 6
6 import static org.onlab.onos.foo.IOLoopTestServer.startStandalone; 7 import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
7 8
8 -
9 /** 9 /**
10 * Starts the test IO loop server. 10 * Starts the test IO loop server.
11 */ 11 */
...@@ -13,10 +13,22 @@ import static org.onlab.onos.foo.IOLoopTestServer.startStandalone; ...@@ -13,10 +13,22 @@ import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
13 description = "Starts the test IO loop server") 13 description = "Starts the test IO loop server")
14 public class TestIOServerCommand extends AbstractShellCommand { 14 public class TestIOServerCommand extends AbstractShellCommand {
15 15
16 + @Argument(index = 0, name = "serverIp", description = "Server IP address",
17 + required = false, multiValued = false)
18 + String serverIp = "127.0.0.1";
19 +
20 + @Argument(index = 1, name = "workers", description = "IO workers",
21 + required = false, multiValued = false)
22 + String workers = "6";
23 +
24 + @Argument(index = 2, name = "messageLength", description = "Message length (bytes)",
25 + required = false, multiValued = false)
26 + String messageLength = "128";
27 +
16 @Override 28 @Override
17 protected void execute() { 29 protected void execute() {
18 try { 30 try {
19 - startStandalone(new String[]{}); 31 + startStandalone(new String[]{serverIp, workers, messageLength});
20 } catch (Exception e) { 32 } catch (Exception e) {
21 error("Unable to start server %s", e); 33 error("Unable to start server %s", e);
22 } 34 }
......
...@@ -6,6 +6,7 @@ import org.onlab.nio.MessageStream; ...@@ -6,6 +6,7 @@ import org.onlab.nio.MessageStream;
6 import java.nio.ByteBuffer; 6 import java.nio.ByteBuffer;
7 import java.nio.channels.ByteChannel; 7 import java.nio.channels.ByteChannel;
8 8
9 +import static com.google.common.base.Preconditions.checkArgument;
9 import static com.google.common.base.Preconditions.checkState; 10 import static com.google.common.base.Preconditions.checkState;
10 11
11 /** 12 /**
...@@ -19,12 +20,18 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -19,12 +20,18 @@ public class TestMessageStream extends MessageStream<TestMessage> {
19 private static final int META_LENGTH = 40; 20 private static final int META_LENGTH = 40;
20 21
21 private final int length; 22 private final int length;
23 + private boolean isStrict = true;
22 24
23 public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) { 25 public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
24 super(loop, ch, 64 * 1024, 500); 26 super(loop, ch, 64 * 1024, 500);
27 + checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
25 this.length = length; 28 this.length = length;
26 } 29 }
27 30
31 + void setNonStrict() {
32 + isStrict = false;
33 + }
34 +
28 @Override 35 @Override
29 protected TestMessage read(ByteBuffer rb) { 36 protected TestMessage read(ByteBuffer rb) {
30 if (rb.remaining() < length) { 37 if (rb.remaining() < length) {
...@@ -32,16 +39,20 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -32,16 +39,20 @@ public class TestMessageStream extends MessageStream<TestMessage> {
32 } 39 }
33 40
34 long startTag = rb.getLong(); 41 long startTag = rb.getLong();
42 + if (isStrict) {
35 checkState(startTag == START_TAG, "Incorrect message start"); 43 checkState(startTag == START_TAG, "Incorrect message start");
44 + }
36 45
37 long size = rb.getLong(); 46 long size = rb.getLong();
38 long requestorTime = rb.getLong(); 47 long requestorTime = rb.getLong();
39 long responderTime = rb.getLong(); 48 long responderTime = rb.getLong();
40 - byte[] padding = padding(length); 49 + byte[] padding = padding();
41 rb.get(padding); 50 rb.get(padding);
42 51
43 long endTag = rb.getLong(); 52 long endTag = rb.getLong();
53 + if (isStrict) {
44 checkState(endTag == END_TAG, "Incorrect message end"); 54 checkState(endTag == END_TAG, "Incorrect message end");
55 + }
45 56
46 return new TestMessage((int) size, requestorTime, responderTime, padding); 57 return new TestMessage((int) size, requestorTime, responderTime, padding);
47 } 58 }
...@@ -60,7 +71,7 @@ public class TestMessageStream extends MessageStream<TestMessage> { ...@@ -60,7 +71,7 @@ public class TestMessageStream extends MessageStream<TestMessage> {
60 wb.putLong(END_TAG); 71 wb.putLong(END_TAG);
61 } 72 }
62 73
63 - public byte[] padding(int msgLength) { 74 + public byte[] padding() {
64 - return new byte[msgLength - META_LENGTH]; 75 + return new byte[length - META_LENGTH];
65 } 76 }
66 } 77 }
......
...@@ -5,7 +5,7 @@ import java.util.Objects; ...@@ -5,7 +5,7 @@ import java.util.Objects;
5 public final class MastershipTerm { 5 public final class MastershipTerm {
6 6
7 private final NodeId master; 7 private final NodeId master;
8 - private int termNumber; 8 + private final int termNumber;
9 9
10 private MastershipTerm(NodeId master, int term) { 10 private MastershipTerm(NodeId master, int term) {
11 this.master = master; 11 this.master = master;
......
...@@ -134,6 +134,12 @@ ...@@ -134,6 +134,12 @@
134 </dependency> 134 </dependency>
135 135
136 <dependency> 136 <dependency>
137 + <groupId>org.livetribe.slp</groupId>
138 + <artifactId>livetribe-slp</artifactId>
139 + <version>2.2.1</version>
140 + </dependency>
141 +
142 + <dependency>
137 <groupId>com.hazelcast</groupId> 143 <groupId>com.hazelcast</groupId>
138 <artifactId>hazelcast</artifactId> 144 <artifactId>hazelcast</artifactId>
139 <version>3.3</version> 145 <version>3.3</version>
......
...@@ -259,13 +259,16 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>> ...@@ -259,13 +259,16 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
259 259
260 /** 260 /**
261 * Prunes the registered streams by discarding any stale ones. 261 * Prunes the registered streams by discarding any stale ones.
262 + *
263 + * @return number of remaining streams
262 */ 264 */
263 - public synchronized void pruneStaleStreams() { 265 + public synchronized int pruneStaleStreams() {
264 for (MessageStream<M> stream : streams) { 266 for (MessageStream<M> stream : streams) {
265 if (stream.isStale()) { 267 if (stream.isStale()) {
266 stream.close(); 268 stream.close();
267 } 269 }
268 } 270 }
271 + return streams.size();
269 } 272 }
270 273
271 } 274 }
......
...@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; ...@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException; 24 import java.util.concurrent.TimeoutException;
25 25
26 import static java.lang.String.format; 26 import static java.lang.String.format;
27 -import static java.lang.System.currentTimeMillis; 27 +import static java.lang.System.nanoTime;
28 import static java.lang.System.out; 28 import static java.lang.System.out;
29 import static org.onlab.nio.IOLoopTestServer.PORT; 29 import static org.onlab.nio.IOLoopTestServer.PORT;
30 import static org.onlab.util.Tools.delay; 30 import static org.onlab.util.Tools.delay;
...@@ -79,7 +79,7 @@ public class IOLoopTestClient { ...@@ -79,7 +79,7 @@ public class IOLoopTestClient {
79 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6; 79 int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
80 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000; 80 int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
81 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128; 81 int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
82 - int to = args.length > 4 ? Integer.parseInt(args[4]) : 30; 82 + int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
83 83
84 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", 84 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
85 wc, mc, ml, ip); 85 wc, mc, ml, ip);
...@@ -183,7 +183,7 @@ public class IOLoopTestClient { ...@@ -183,7 +183,7 @@ public class IOLoopTestClient {
183 */ 183 */
184 public void report() { 184 public void report() {
185 DecimalFormat f = new DecimalFormat("#,##0"); 185 DecimalFormat f = new DecimalFormat("#,##0");
186 - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency", 186 + out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
187 f.format(messages.total()), f.format(bytes.total()), 187 f.format(messages.total()), f.format(bytes.total()),
188 f.format(messages.throughput()), 188 f.format(messages.throughput()),
189 f.format(bytes.throughput() / (1024 * msgLength)), 189 f.format(bytes.throughput() / (1024 * msgLength)),
...@@ -212,16 +212,8 @@ public class IOLoopTestClient { ...@@ -212,16 +212,8 @@ public class IOLoopTestClient {
212 @Override 212 @Override
213 protected synchronized void removeStream(MessageStream<TestMessage> stream) { 213 protected synchronized void removeStream(MessageStream<TestMessage> stream) {
214 super.removeStream(stream); 214 super.removeStream(stream);
215 -
216 messages.add(stream.messagesIn().total()); 215 messages.add(stream.messagesIn().total());
217 bytes.add(stream.bytesIn().total()); 216 bytes.add(stream.bytesIn().total());
218 -
219 -// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
220 -// FORMAT.format(stream.messagesIn().throughput()),
221 -// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
222 -// FORMAT.format(stream.messagesOut().throughput()),
223 -// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
224 -
225 stream.messagesOut().reset(); 217 stream.messagesOut().reset();
226 stream.bytesOut().reset(); 218 stream.bytesOut().reset();
227 } 219 }
...@@ -231,7 +223,7 @@ public class IOLoopTestClient { ...@@ -231,7 +223,7 @@ public class IOLoopTestClient {
231 MessageStream<TestMessage> stream) { 223 MessageStream<TestMessage> stream) {
232 for (TestMessage message : messages) { 224 for (TestMessage message : messages) {
233 // TODO: summarize latency data better 225 // TODO: summarize latency data better
234 - latencyTotal += currentTimeMillis() - message.requestorTime(); 226 + latencyTotal += nanoTime() - message.requestorTime();
235 latencyCount++; 227 latencyCount++;
236 } 228 }
237 worker.release(messages.size()); 229 worker.release(messages.size());
...@@ -252,7 +244,7 @@ public class IOLoopTestClient { ...@@ -252,7 +244,7 @@ public class IOLoopTestClient {
252 */ 244 */
253 private class Worker implements Runnable { 245 private class Worker implements Runnable {
254 246
255 - private static final int BATCH_SIZE = 1000; 247 + private static final int BATCH_SIZE = 50;
256 private static final int PERMITS = 2 * BATCH_SIZE; 248 private static final int PERMITS = 2 * BATCH_SIZE;
257 249
258 private TestMessageStream stream; 250 private TestMessageStream stream;
...@@ -295,8 +287,7 @@ public class IOLoopTestClient { ...@@ -295,8 +287,7 @@ public class IOLoopTestClient {
295 // Build a batch of messages 287 // Build a batch of messages
296 List<TestMessage> batch = Lists.newArrayListWithCapacity(size); 288 List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
297 for (int i = 0; i < size; i++) { 289 for (int i = 0; i < size; i++) {
298 - batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, 290 + batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
299 - stream.padding()));
300 } 291 }
301 acquire(size); 292 acquire(size);
302 stream.write(batch); 293 stream.write(batch);
......
...@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService; ...@@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors; 20 import java.util.concurrent.Executors;
21 21
22 import static java.lang.String.format; 22 import static java.lang.String.format;
23 -import static java.lang.System.currentTimeMillis;
24 import static java.lang.System.out; 23 import static java.lang.System.out;
25 import static org.onlab.util.Tools.delay; 24 import static org.onlab.util.Tools.delay;
26 import static org.onlab.util.Tools.namedThreads; 25 import static org.onlab.util.Tools.namedThreads;
...@@ -82,11 +81,14 @@ public class IOLoopTestServer { ...@@ -82,11 +81,14 @@ public class IOLoopTestServer {
82 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); 81 IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
83 server.start(); 82 server.start();
84 83
85 - // Start pruning clients. 84 + // Start pruning clients and keep going until their number goes to 0.
86 - while (true) { 85 + int remaining = -1;
86 + while (remaining == -1 || remaining > 0) {
87 delay(PRUNE_FREQUENCY); 87 delay(PRUNE_FREQUENCY);
88 - server.prune(); 88 + int r = server.prune();
89 + remaining = remaining == -1 && r == 0 ? remaining : r;
89 } 90 }
91 + server.stop();
90 } 92 }
91 93
92 /** 94 /**
...@@ -150,7 +152,7 @@ public class IOLoopTestServer { ...@@ -150,7 +152,7 @@ public class IOLoopTestServer {
150 */ 152 */
151 public void report() { 153 public void report() {
152 DecimalFormat f = new DecimalFormat("#,##0"); 154 DecimalFormat f = new DecimalFormat("#,##0");
153 - out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs", 155 + out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
154 f.format(messages.total()), f.format(bytes.total()), 156 f.format(messages.total()), f.format(bytes.total()),
155 f.format(messages.throughput()), 157 f.format(messages.throughput()),
156 f.format(bytes.throughput() / (1024 * msgLength)))); 158 f.format(bytes.throughput() / (1024 * msgLength))));
...@@ -158,11 +160,15 @@ public class IOLoopTestServer { ...@@ -158,11 +160,15 @@ public class IOLoopTestServer {
158 160
159 /** 161 /**
160 * Prunes the IO loops of stale message buffers. 162 * Prunes the IO loops of stale message buffers.
163 + *
164 + * @return number of remaining IO loops among all workers.
161 */ 165 */
162 - public void prune() { 166 + public int prune() {
167 + int count = 0;
163 for (CustomIOLoop l : iloops) { 168 for (CustomIOLoop l : iloops) {
164 - l.pruneStaleStreams(); 169 + count += l.pruneStaleStreams();
165 } 170 }
171 + return count;
166 } 172 }
167 173
168 // Get the next worker to which a client should be assigned 174 // Get the next worker to which a client should be assigned
...@@ -186,15 +192,8 @@ public class IOLoopTestServer { ...@@ -186,15 +192,8 @@ public class IOLoopTestServer {
186 @Override 192 @Override
187 protected void removeStream(MessageStream<TestMessage> stream) { 193 protected void removeStream(MessageStream<TestMessage> stream) {
188 super.removeStream(stream); 194 super.removeStream(stream);
189 -
190 messages.add(stream.messagesIn().total()); 195 messages.add(stream.messagesIn().total());
191 bytes.add(stream.bytesIn().total()); 196 bytes.add(stream.bytesIn().total());
192 -
193 -// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
194 -// FORMAT.format(stream.messagesIn().throughput()),
195 -// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
196 -// FORMAT.format(stream.messagesOut().throughput()),
197 -// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
198 } 197 }
199 198
200 @Override 199 @Override
...@@ -211,7 +210,7 @@ public class IOLoopTestServer { ...@@ -211,7 +210,7 @@ public class IOLoopTestServer {
211 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); 210 List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
212 for (TestMessage message : messages) { 211 for (TestMessage message : messages) {
213 responses.add(new TestMessage(message.length(), message.requestorTime(), 212 responses.add(new TestMessage(message.length(), message.requestorTime(),
214 - currentTimeMillis(), message.padding())); 213 + System.nanoTime(), message.padding()));
215 } 214 }
216 return responses; 215 return responses;
217 } 216 }
......