Showing
4 changed files
with
26 additions
and
16 deletions
| ... | @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; | ... | @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; |
| 27 | 27 | ||
| 28 | import static java.lang.String.format; | 28 | import static java.lang.String.format; |
| 29 | import static java.lang.System.currentTimeMillis; | 29 | import static java.lang.System.currentTimeMillis; |
| 30 | +import static java.lang.System.nanoTime; | ||
| 30 | import static java.lang.System.out; | 31 | import static java.lang.System.out; |
| 31 | import static org.onlab.onos.foo.IOLoopTestServer.PORT; | 32 | import static org.onlab.onos.foo.IOLoopTestServer.PORT; |
| 32 | import static org.onlab.util.Tools.delay; | 33 | import static org.onlab.util.Tools.delay; |
| ... | @@ -185,7 +186,7 @@ public class IOLoopTestClient { | ... | @@ -185,7 +186,7 @@ public class IOLoopTestClient { |
| 185 | */ | 186 | */ |
| 186 | public void report() { | 187 | public void report() { |
| 187 | DecimalFormat f = new DecimalFormat("#,##0"); | 188 | DecimalFormat f = new DecimalFormat("#,##0"); |
| 188 | - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency", | 189 | + out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ns latency", |
| 189 | f.format(messages.total()), f.format(bytes.total()), | 190 | f.format(messages.total()), f.format(bytes.total()), |
| 190 | f.format(messages.throughput()), | 191 | f.format(messages.throughput()), |
| 191 | f.format(bytes.throughput() / (1024 * msgLength)), | 192 | f.format(bytes.throughput() / (1024 * msgLength)), |
| ... | @@ -233,7 +234,7 @@ public class IOLoopTestClient { | ... | @@ -233,7 +234,7 @@ public class IOLoopTestClient { |
| 233 | MessageStream<TestMessage> stream) { | 234 | MessageStream<TestMessage> stream) { |
| 234 | for (TestMessage message : messages) { | 235 | for (TestMessage message : messages) { |
| 235 | // TODO: summarize latency data better | 236 | // TODO: summarize latency data better |
| 236 | - latencyTotal += currentTimeMillis() - message.requestorTime(); | 237 | + latencyTotal += nanoTime() - message.requestorTime(); |
| 237 | latencyCount++; | 238 | latencyCount++; |
| 238 | } | 239 | } |
| 239 | worker.release(messages.size()); | 240 | worker.release(messages.size()); |
| ... | @@ -254,7 +255,7 @@ public class IOLoopTestClient { | ... | @@ -254,7 +255,7 @@ public class IOLoopTestClient { |
| 254 | */ | 255 | */ |
| 255 | private class Worker implements Runnable { | 256 | private class Worker implements Runnable { |
| 256 | 257 | ||
| 257 | - private static final int BATCH_SIZE = 1000; | 258 | + private static final int BATCH_SIZE = 10; |
| 258 | private static final int PERMITS = 2 * BATCH_SIZE; | 259 | private static final int PERMITS = 2 * BATCH_SIZE; |
| 259 | 260 | ||
| 260 | private TestMessageStream stream; | 261 | private TestMessageStream stream; |
| ... | @@ -297,7 +298,7 @@ public class IOLoopTestClient { | ... | @@ -297,7 +298,7 @@ public class IOLoopTestClient { |
| 297 | // Build a batch of messages | 298 | // Build a batch of messages |
| 298 | List<TestMessage> batch = Lists.newArrayListWithCapacity(size); | 299 | List<TestMessage> batch = Lists.newArrayListWithCapacity(size); |
| 299 | for (int i = 0; i < size; i++) { | 300 | for (int i = 0; i < size; i++) { |
| 300 | - batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, | 301 | + batch.add(new TestMessage(msgLength, nanoTime(), 0, |
| 301 | stream.padding())); | 302 | stream.padding())); |
| 302 | } | 303 | } |
| 303 | acquire(size); | 304 | acquire(size); | ... | ... |
| ... | @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; | ... | @@ -24,6 +24,7 @@ import java.util.concurrent.Executors; |
| 24 | 24 | ||
| 25 | import static java.lang.String.format; | 25 | import static java.lang.String.format; |
| 26 | import static java.lang.System.currentTimeMillis; | 26 | import static java.lang.System.currentTimeMillis; |
| 27 | +import static java.lang.System.nanoTime; | ||
| 27 | import static java.lang.System.out; | 28 | import static java.lang.System.out; |
| 28 | import static org.onlab.util.Tools.delay; | 29 | import static org.onlab.util.Tools.delay; |
| 29 | import static org.onlab.util.Tools.namedThreads; | 30 | import static org.onlab.util.Tools.namedThreads; |
| ... | @@ -92,6 +93,7 @@ public class IOLoopTestServer { | ... | @@ -92,6 +93,7 @@ public class IOLoopTestServer { |
| 92 | int r = server.prune(); | 93 | int r = server.prune(); |
| 93 | remaining = remaining == -1 && r == 0 ? remaining : r; | 94 | remaining = remaining == -1 && r == 0 ? remaining : r; |
| 94 | } | 95 | } |
| 96 | + server.stop(); | ||
| 95 | } | 97 | } |
| 96 | 98 | ||
| 97 | /** | 99 | /** |
| ... | @@ -220,7 +222,7 @@ public class IOLoopTestServer { | ... | @@ -220,7 +222,7 @@ public class IOLoopTestServer { |
| 220 | List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); | 222 | List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); |
| 221 | for (TestMessage message : messages) { | 223 | for (TestMessage message : messages) { |
| 222 | responses.add(new TestMessage(message.length(), message.requestorTime(), | 224 | responses.add(new TestMessage(message.length(), message.requestorTime(), |
| 223 | - currentTimeMillis(), message.padding())); | 225 | + nanoTime(), message.padding())); |
| 224 | } | 226 | } |
| 225 | return responses; | 227 | return responses; |
| 226 | } | 228 | } | ... | ... |
| ... | @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; | ... | @@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException; |
| 25 | 25 | ||
| 26 | import static java.lang.String.format; | 26 | import static java.lang.String.format; |
| 27 | import static java.lang.System.currentTimeMillis; | 27 | import static java.lang.System.currentTimeMillis; |
| 28 | +import static java.lang.System.nanoTime; | ||
| 28 | import static java.lang.System.out; | 29 | import static java.lang.System.out; |
| 29 | import static org.onlab.nio.IOLoopTestServer.PORT; | 30 | import static org.onlab.nio.IOLoopTestServer.PORT; |
| 30 | import static org.onlab.util.Tools.delay; | 31 | import static org.onlab.util.Tools.delay; |
| ... | @@ -183,7 +184,7 @@ public class IOLoopTestClient { | ... | @@ -183,7 +184,7 @@ public class IOLoopTestClient { |
| 183 | */ | 184 | */ |
| 184 | public void report() { | 185 | public void report() { |
| 185 | DecimalFormat f = new DecimalFormat("#,##0"); | 186 | DecimalFormat f = new DecimalFormat("#,##0"); |
| 186 | - out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency", | 187 | + out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ns latency", |
| 187 | f.format(messages.total()), f.format(bytes.total()), | 188 | f.format(messages.total()), f.format(bytes.total()), |
| 188 | f.format(messages.throughput()), | 189 | f.format(messages.throughput()), |
| 189 | f.format(bytes.throughput() / (1024 * msgLength)), | 190 | f.format(bytes.throughput() / (1024 * msgLength)), |
| ... | @@ -231,7 +232,7 @@ public class IOLoopTestClient { | ... | @@ -231,7 +232,7 @@ public class IOLoopTestClient { |
| 231 | MessageStream<TestMessage> stream) { | 232 | MessageStream<TestMessage> stream) { |
| 232 | for (TestMessage message : messages) { | 233 | for (TestMessage message : messages) { |
| 233 | // TODO: summarize latency data better | 234 | // TODO: summarize latency data better |
| 234 | - latencyTotal += currentTimeMillis() - message.requestorTime(); | 235 | + latencyTotal += nanoTime() - message.requestorTime(); |
| 235 | latencyCount++; | 236 | latencyCount++; |
| 236 | } | 237 | } |
| 237 | worker.release(messages.size()); | 238 | worker.release(messages.size()); |
| ... | @@ -252,7 +253,7 @@ public class IOLoopTestClient { | ... | @@ -252,7 +253,7 @@ public class IOLoopTestClient { |
| 252 | */ | 253 | */ |
| 253 | private class Worker implements Runnable { | 254 | private class Worker implements Runnable { |
| 254 | 255 | ||
| 255 | - private static final int BATCH_SIZE = 1000; | 256 | + private static final int BATCH_SIZE = 50; |
| 256 | private static final int PERMITS = 2 * BATCH_SIZE; | 257 | private static final int PERMITS = 2 * BATCH_SIZE; |
| 257 | 258 | ||
| 258 | private TestMessageStream stream; | 259 | private TestMessageStream stream; |
| ... | @@ -295,8 +296,7 @@ public class IOLoopTestClient { | ... | @@ -295,8 +296,7 @@ public class IOLoopTestClient { |
| 295 | // Build a batch of messages | 296 | // Build a batch of messages |
| 296 | List<TestMessage> batch = Lists.newArrayListWithCapacity(size); | 297 | List<TestMessage> batch = Lists.newArrayListWithCapacity(size); |
| 297 | for (int i = 0; i < size; i++) { | 298 | for (int i = 0; i < size; i++) { |
| 298 | - batch.add(new TestMessage(msgLength, currentTimeMillis(), 0, | 299 | + batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding())); |
| 299 | - stream.padding())); | ||
| 300 | } | 300 | } |
| 301 | acquire(size); | 301 | acquire(size); |
| 302 | stream.write(batch); | 302 | stream.write(batch); | ... | ... |
| ... | @@ -82,11 +82,14 @@ public class IOLoopTestServer { | ... | @@ -82,11 +82,14 @@ public class IOLoopTestServer { |
| 82 | IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); | 82 | IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT); |
| 83 | server.start(); | 83 | server.start(); |
| 84 | 84 | ||
| 85 | - // Start pruning clients. | 85 | + // Start pruning clients and keep going until their number goes to 0. |
| 86 | - while (true) { | 86 | + int remaining = -1; |
| 87 | + while (remaining == -1 || remaining > 0) { | ||
| 87 | delay(PRUNE_FREQUENCY); | 88 | delay(PRUNE_FREQUENCY); |
| 88 | - server.prune(); | 89 | + int r = server.prune(); |
| 90 | + remaining = remaining == -1 && r == 0 ? remaining : r; | ||
| 89 | } | 91 | } |
| 92 | + server.stop(); | ||
| 90 | } | 93 | } |
| 91 | 94 | ||
| 92 | /** | 95 | /** |
| ... | @@ -158,11 +161,15 @@ public class IOLoopTestServer { | ... | @@ -158,11 +161,15 @@ public class IOLoopTestServer { |
| 158 | 161 | ||
| 159 | /** | 162 | /** |
| 160 | * Prunes the IO loops of stale message buffers. | 163 | * Prunes the IO loops of stale message buffers. |
| 164 | + * | ||
| 165 | + * @return number of remaining IO loops among all workers. | ||
| 161 | */ | 166 | */ |
| 162 | - public void prune() { | 167 | + public int prune() { |
| 168 | + int count = 0; | ||
| 163 | for (CustomIOLoop l : iloops) { | 169 | for (CustomIOLoop l : iloops) { |
| 164 | - l.pruneStaleStreams(); | 170 | + count += l.pruneStaleStreams(); |
| 165 | } | 171 | } |
| 172 | + return count; | ||
| 166 | } | 173 | } |
| 167 | 174 | ||
| 168 | // Get the next worker to which a client should be assigned | 175 | // Get the next worker to which a client should be assigned |
| ... | @@ -211,7 +218,7 @@ public class IOLoopTestServer { | ... | @@ -211,7 +218,7 @@ public class IOLoopTestServer { |
| 211 | List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); | 218 | List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size()); |
| 212 | for (TestMessage message : messages) { | 219 | for (TestMessage message : messages) { |
| 213 | responses.add(new TestMessage(message.length(), message.requestorTime(), | 220 | responses.add(new TestMessage(message.length(), message.requestorTime(), |
| 214 | - currentTimeMillis(), message.padding())); | 221 | + System.nanoTime(), message.padding())); |
| 215 | } | 222 | } |
| 216 | return responses; | 223 | return responses; |
| 217 | } | 224 | } | ... | ... |
-
Please register or login to post a comment