tom

More IO loop work.

1 package org.onlab.nio; 1 package org.onlab.nio;
2 2
3 +import org.onlab.util.Counter;
3 import org.slf4j.Logger; 4 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
5 6
...@@ -40,6 +41,10 @@ public abstract class MessageStream<M extends Message> { ...@@ -40,6 +41,10 @@ public abstract class MessageStream<M extends Message> {
40 private Exception ioError; 41 private Exception ioError;
41 private long lastActiveTime; 42 private long lastActiveTime;
42 43
44 + private final Counter bytesIn = new Counter();
45 + private final Counter messagesIn = new Counter();
46 + private final Counter bytesOut = new Counter();
47 + private final Counter messagesOut = new Counter();
43 48
44 /** 49 /**
45 * Creates a message stream associated with the specified IO loop and 50 * Creates a message stream associated with the specified IO loop and
...@@ -93,6 +98,11 @@ public abstract class MessageStream<M extends Message> { ...@@ -93,6 +98,11 @@ public abstract class MessageStream<M extends Message> {
93 closed = true; 98 closed = true;
94 } 99 }
95 100
101 + bytesIn.freeze();
102 + bytesOut.freeze();
103 + messagesIn.freeze();
104 + messagesOut.freeze();
105 +
96 loop.removeStream(this); 106 loop.removeStream(this);
97 if (key != null) { 107 if (key != null) {
98 try { 108 try {
...@@ -176,6 +186,8 @@ public abstract class MessageStream<M extends Message> { ...@@ -176,6 +186,8 @@ public abstract class MessageStream<M extends Message> {
176 inbound.flip(); 186 inbound.flip();
177 while ((message = read(inbound)) != null) { 187 while ((message = read(inbound)) != null) {
178 messages.add(message); 188 messages.add(message);
189 + messagesIn.add(1);
190 + bytesIn.add(message.length());
179 } 191 }
180 inbound.compact(); 192 inbound.compact();
181 193
...@@ -226,8 +238,9 @@ public abstract class MessageStream<M extends Message> { ...@@ -226,8 +238,9 @@ public abstract class MessageStream<M extends Message> {
226 while (outbound.remaining() < message.length()) { 238 while (outbound.remaining() < message.length()) {
227 doubleSize(); 239 doubleSize();
228 } 240 }
229 - // Place the message into the buffer and bump the output trackers.
230 write(message, outbound); 241 write(message, outbound);
242 + messagesOut.add(1);
243 + bytesOut.add(message.length());
231 } 244 }
232 245
233 // Forces a flush, unless one is planned already. 246 // Forces a flush, unless one is planned already.
...@@ -273,6 +286,18 @@ public abstract class MessageStream<M extends Message> { ...@@ -273,6 +286,18 @@ public abstract class MessageStream<M extends Message> {
273 } 286 }
274 } 287 }
275 288
289 +
290 + /**
291 + * Indicates whether data has been written but not flushed yet.
292 + *
293 + * @return true if flush is required
294 + */
295 + boolean isFlushRequired() {
296 + synchronized (this) {
297 + return outbound.position() > 0;
298 + }
299 + }
300 +
276 /** 301 /**
277 * Attempts to flush data, internal stream state and channel availability 302 * Attempts to flush data, internal stream state and channel availability
278 * permitting. Invoked by the driver I/O loop during handling of writable 303 * permitting. Invoked by the driver I/O loop during handling of writable
...@@ -344,4 +369,40 @@ public abstract class MessageStream<M extends Message> { ...@@ -344,4 +369,40 @@ public abstract class MessageStream<M extends Message> {
344 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null; 369 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
345 } 370 }
346 371
372 + /**
373 + * Returns the inbound bytes counter.
374 + *
375 + * @return inbound bytes counter
376 + */
377 + public Counter bytesIn() {
378 + return bytesIn;
379 + }
380 +
381 + /**
382 + * Returns the outbound bytes counter.
383 + *
384 + * @return outbound bytes counter
385 + */
386 + public Counter bytesOut() {
387 + return bytesOut;
388 + }
389 +
390 + /**
391 + * Returns the inbound messages counter.
392 + *
393 + * @return inbound messages counter
394 + */
395 + public Counter messagesIn() {
396 + return messagesIn;
397 + }
398 +
399 + /**
400 + * Returns the outbound messages counter.
401 + *
402 + * @return outbound messages counter
403 + */
404 + public Counter messagesOut() {
405 + return messagesOut;
406 + }
407 +
347 } 408 }
......
1 +/**
2 + * Mechanism to transfer messages over network using IO loop and
3 + * message stream, backed by NIO byte buffers.
4 + */
5 +package org.onlab.nio;
...\ No newline at end of file ...\ No newline at end of file
1 package org.onlab.nio; 1 package org.onlab.nio;
2 2
3 +import org.onlab.util.Counter;
3 import org.slf4j.Logger; 4 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
5 6
...@@ -10,6 +11,7 @@ import java.net.SocketAddress; ...@@ -10,6 +11,7 @@ import java.net.SocketAddress;
10 import java.nio.channels.ByteChannel; 11 import java.nio.channels.ByteChannel;
11 import java.nio.channels.SelectionKey; 12 import java.nio.channels.SelectionKey;
12 import java.nio.channels.SocketChannel; 13 import java.nio.channels.SocketChannel;
14 +import java.text.DecimalFormat;
13 import java.util.ArrayList; 15 import java.util.ArrayList;
14 import java.util.List; 16 import java.util.List;
15 import java.util.concurrent.ExecutionException; 17 import java.util.concurrent.ExecutionException;
...@@ -26,9 +28,9 @@ import static org.onlab.util.Tools.namedThreads; ...@@ -26,9 +28,9 @@ import static org.onlab.util.Tools.namedThreads;
26 /** 28 /**
27 * Auxiliary test fixture to measure speed of NIO-based channels. 29 * Auxiliary test fixture to measure speed of NIO-based channels.
28 */ 30 */
29 -public class StandaloneSpeedClient { 31 +public class IOLoopClient {
30 32
31 - private static Logger log = LoggerFactory.getLogger(StandaloneSpeedClient.class); 33 + private static Logger log = LoggerFactory.getLogger(IOLoopClient.class);
32 34
33 private final InetAddress ip; 35 private final InetAddress ip;
34 private final int port; 36 private final int port;
...@@ -39,8 +41,8 @@ public class StandaloneSpeedClient { ...@@ -39,8 +41,8 @@ public class StandaloneSpeedClient {
39 private final ExecutorService ipool; 41 private final ExecutorService ipool;
40 private final ExecutorService wpool; 42 private final ExecutorService wpool;
41 43
42 -// ThroughputTracker messages; 44 + Counter messages;
43 -// ThroughputTracker bytes; 45 + Counter bytes;
44 46
45 /** 47 /**
46 * Main entry point to launch the client. 48 * Main entry point to launch the client.
...@@ -61,7 +63,7 @@ public class StandaloneSpeedClient { ...@@ -61,7 +63,7 @@ public class StandaloneSpeedClient {
61 63
62 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ", 64 log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
63 wc, mc, ml, ip); 65 wc, mc, ml, ip);
64 - StandaloneSpeedClient sc = new StandaloneSpeedClient(ip, wc, mc, ml, StandaloneSpeedServer.PORT); 66 + IOLoopClient sc = new IOLoopClient(ip, wc, mc, ml, IOLoopServer.PORT);
65 67
66 sc.start(); 68 sc.start();
67 delay(2000); 69 delay(2000);
...@@ -82,7 +84,7 @@ public class StandaloneSpeedClient { ...@@ -82,7 +84,7 @@ public class StandaloneSpeedClient {
82 * @param port socket port 84 * @param port socket port
83 * @throws IOException if unable to create IO loops 85 * @throws IOException if unable to create IO loops
84 */ 86 */
85 - public StandaloneSpeedClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException { 87 + public IOLoopClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
86 this.ip = ip; 88 this.ip = ip;
87 this.port = port; 89 this.port = port;
88 this.msgCount = mc; 90 this.msgCount = mc;
...@@ -101,15 +103,15 @@ public class StandaloneSpeedClient { ...@@ -101,15 +103,15 @@ public class StandaloneSpeedClient {
101 * @throws IOException if unable to open connection 103 * @throws IOException if unable to open connection
102 */ 104 */
103 public void start() throws IOException { 105 public void start() throws IOException {
104 -// messages = new ThroughputTracker(); 106 + messages = new Counter();
105 -// bytes = new ThroughputTracker(); 107 + bytes = new Counter();
106 108
107 // First start up all the IO loops 109 // First start up all the IO loops
108 for (CustomIOLoop l : iloops) { 110 for (CustomIOLoop l : iloops) {
109 ipool.execute(l); 111 ipool.execute(l);
110 } 112 }
111 113
112 -// // Wait for all of them to get going 114 + // Wait for all of them to get going
113 // for (CustomIOLoop l : iloops) 115 // for (CustomIOLoop l : iloops)
114 // l.waitForStart(TIMEOUT); 116 // l.waitForStart(TIMEOUT);
115 117
...@@ -151,20 +153,20 @@ public class StandaloneSpeedClient { ...@@ -151,20 +153,20 @@ public class StandaloneSpeedClient {
151 l.worker.task.get(secs, TimeUnit.SECONDS); 153 l.worker.task.get(secs, TimeUnit.SECONDS);
152 } 154 }
153 } 155 }
154 -// messages.freeze(); 156 + messages.freeze();
155 -// bytes.freeze(); 157 + bytes.freeze();
156 } 158 }
157 159
158 /** 160 /**
159 * Reports on the accumulated throughput trackers. 161 * Reports on the accumulated throughput trackers.
160 */ 162 */
161 public void report() { 163 public void report() {
162 -// DecimalFormat f = new DecimalFormat("#,##0"); 164 + DecimalFormat f = new DecimalFormat("#,##0");
163 -// log.info("{} messages; {} bytes; {} mps; {} Mbs", 165 + log.info("{} messages; {} bytes; {} mps; {} Mbs",
164 -// f.format(messages.total()), 166 + f.format(messages.total()),
165 -// f.format(bytes.total()), 167 + f.format(bytes.total()),
166 -// f.format(messages.throughput()), 168 + f.format(messages.throughput()),
167 -// f.format(bytes.throughput() / (1024 * 128))); 169 + f.format(bytes.throughput() / (1024 * 128)));
168 } 170 }
169 171
170 172
...@@ -187,16 +189,16 @@ public class StandaloneSpeedClient { ...@@ -187,16 +189,16 @@ public class StandaloneSpeedClient {
187 protected synchronized void removeStream(MessageStream<TestMessage> b) { 189 protected synchronized void removeStream(MessageStream<TestMessage> b) {
188 super.removeStream(b); 190 super.removeStream(b);
189 191
190 -// messages.add(b.inMessages().total()); 192 + messages.add(b.messagesIn().total());
191 -// bytes.add(b.inBytes().total()); 193 + bytes.add(b.bytesIn().total());
192 -// b.inMessages().reset(); 194 + b.messagesOut().reset();
193 -// b.inBytes().reset(); 195 + b.bytesOut().reset();
194 - 196 +//
195 -// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps", 197 + log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
196 -// StandaloneSpeedServer.format.format(b.inMessages().throughput()), 198 + IOLoopServer.FORMAT.format(b.messagesIn().throughput()),
197 -// StandaloneSpeedServer.format.format(b.inBytes().throughput() / (1024 * 128)), 199 + IOLoopServer.FORMAT.format(b.bytesIn().throughput() / (1024 * 128)),
198 -// StandaloneSpeedServer.format.format(b.outMessages().throughput()), 200 + IOLoopServer.FORMAT.format(b.messagesOut().throughput()),
199 -// StandaloneSpeedServer.format.format(b.outBytes().throughput() / (1024 * 128))); 201 + IOLoopServer.FORMAT.format(b.bytesOut().throughput() / (1024 * 128)));
200 } 202 }
201 203
202 @Override 204 @Override
......
...@@ -48,11 +48,11 @@ public class IOLoopIntegrationTest { ...@@ -48,11 +48,11 @@ public class IOLoopIntegrationTest {
48 48
49 // Setup the test on a random port to avoid intermittent test failures 49 // Setup the test on a random port to avoid intermittent test failures
50 // due to the port being already bound. 50 // due to the port being already bound.
51 - int port = StandaloneSpeedServer.PORT + new Random().nextInt(100); 51 + int port = IOLoopServer.PORT + new Random().nextInt(100);
52 52
53 InetAddress ip = InetAddress.getLoopbackAddress(); 53 InetAddress ip = InetAddress.getLoopbackAddress();
54 - StandaloneSpeedServer sss = new StandaloneSpeedServer(ip, THREADS, size, port); 54 + IOLoopServer sss = new IOLoopServer(ip, THREADS, size, port);
55 - StandaloneSpeedClient ssc = new StandaloneSpeedClient(ip, THREADS, count, size, port); 55 + IOLoopClient ssc = new IOLoopClient(ip, THREADS, count, size, port);
56 56
57 sss.start(); 57 sss.start();
58 ssc.start(); 58 ssc.start();
...@@ -64,32 +64,6 @@ public class IOLoopIntegrationTest { ...@@ -64,32 +64,6 @@ public class IOLoopIntegrationTest {
64 delay(1000); 64 delay(1000);
65 sss.stop(); 65 sss.stop();
66 sss.report(); 66 sss.report();
67 -
68 - // Note that the client and server will have potentially significantly
69 - // differing rates. This is due to the wide variance in how tightly
70 - // the throughput tracking starts & stops relative to to the short
71 - // test duration.
72 -// System.out.println(f.format(ssc.messages.throughput()) + " mps");
73 -
74 -// // Make sure client sent everything.
75 -// assertEquals("incorrect client message count sent",
76 -// (long) count * THREADS, ssc.messages.total());
77 -// assertEquals("incorrect client bytes count sent",
78 -// (long) size * count * THREADS, ssc.bytes.total());
79 -//
80 -// // Make sure server received everything.
81 -// assertEquals("incorrect server message count received",
82 -// (long) count * THREADS, sss.messages.total());
83 -// assertEquals("incorrect server bytes count received",
84 -// (long) size * count * THREADS, sss.bytes.total());
85 -//
86 -// // Make sure speeds were reasonable.
87 -// if (mps > 0.0) {
88 -// assertAboveThreshold("insufficient client speed", mps,
89 -// ssc.messages.throughput());
90 -// assertAboveThreshold("insufficient server speed", mps / 2,
91 -// sss.messages.throughput());
92 -// }
93 } 67 }
94 68
95 } 69 }
......
1 package org.onlab.nio; 1 package org.onlab.nio;
2 2
3 +import org.onlab.util.Counter;
3 import org.slf4j.Logger; 4 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory; 5 import org.slf4j.LoggerFactory;
5 6
...@@ -23,9 +24,9 @@ import static org.onlab.util.Tools.namedThreads; ...@@ -23,9 +24,9 @@ import static org.onlab.util.Tools.namedThreads;
23 /** 24 /**
24 * Auxiliary test fixture to measure speed of NIO-based channels. 25 * Auxiliary test fixture to measure speed of NIO-based channels.
25 */ 26 */
26 -public class StandaloneSpeedServer { 27 +public class IOLoopServer {
27 28
28 - private static Logger log = LoggerFactory.getLogger(StandaloneSpeedServer.class); 29 + private static Logger log = LoggerFactory.getLogger(IOLoopServer.class);
29 30
30 private static final int PRUNE_FREQUENCY = 1000; 31 private static final int PRUNE_FREQUENCY = 1000;
31 32
...@@ -48,8 +49,8 @@ public class StandaloneSpeedServer { ...@@ -48,8 +49,8 @@ public class StandaloneSpeedServer {
48 private final int msgLength; 49 private final int msgLength;
49 private int lastWorker = -1; 50 private int lastWorker = -1;
50 51
51 -// ThroughputTracker messages; 52 + Counter messages;
52 -// ThroughputTracker bytes; 53 + Counter bytes;
53 54
54 /** 55 /**
55 * Main entry point to launch the server. 56 * Main entry point to launch the server.
...@@ -64,7 +65,7 @@ public class StandaloneSpeedServer { ...@@ -64,7 +65,7 @@ public class StandaloneSpeedServer {
64 65
65 log.info("Setting up the server with {} workers, {} byte messages on {}... ", 66 log.info("Setting up the server with {} workers, {} byte messages on {}... ",
66 wc, ml, ip); 67 wc, ml, ip);
67 - StandaloneSpeedServer ss = new StandaloneSpeedServer(ip, wc, ml, PORT); 68 + IOLoopServer ss = new IOLoopServer(ip, wc, ml, PORT);
68 ss.start(); 69 ss.start();
69 70
70 // Start pruning clients. 71 // Start pruning clients.
...@@ -83,7 +84,7 @@ public class StandaloneSpeedServer { ...@@ -83,7 +84,7 @@ public class StandaloneSpeedServer {
83 * @param port listen port 84 * @param port listen port
84 * @throws IOException if unable to create IO loops 85 * @throws IOException if unable to create IO loops
85 */ 86 */
86 - public StandaloneSpeedServer(InetAddress ip, int wc, int ml, int port) throws IOException { 87 + public IOLoopServer(InetAddress ip, int wc, int ml, int port) throws IOException {
87 this.workerCount = wc; 88 this.workerCount = wc;
88 this.msgLength = ml; 89 this.msgLength = ml;
89 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop")); 90 this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
...@@ -98,14 +99,14 @@ public class StandaloneSpeedServer { ...@@ -98,14 +99,14 @@ public class StandaloneSpeedServer {
98 * Start the server IO loops and kicks off throughput tracking. 99 * Start the server IO loops and kicks off throughput tracking.
99 */ 100 */
100 public void start() { 101 public void start() {
101 -// messages = new ThroughputTracker(); 102 + messages = new Counter();
102 -// bytes = new ThroughputTracker(); 103 + bytes = new Counter();
103 104
104 for (CustomIOLoop l : iloops) { 105 for (CustomIOLoop l : iloops) {
105 ipool.execute(l); 106 ipool.execute(l);
106 } 107 }
107 apool.execute(aloop); 108 apool.execute(aloop);
108 -// 109 +
109 // for (CustomIOLoop l : iloops) 110 // for (CustomIOLoop l : iloops)
110 // l.waitForStart(TIMEOUT); 111 // l.waitForStart(TIMEOUT);
111 // aloop.waitForStart(TIMEOUT); 112 // aloop.waitForStart(TIMEOUT);
...@@ -124,20 +125,20 @@ public class StandaloneSpeedServer { ...@@ -124,20 +125,20 @@ public class StandaloneSpeedServer {
124 // l.waitForFinish(TIMEOUT); 125 // l.waitForFinish(TIMEOUT);
125 // aloop.waitForFinish(TIMEOUT); 126 // aloop.waitForFinish(TIMEOUT);
126 // 127 //
127 -// messages.freeze(); 128 + messages.freeze();
128 -// bytes.freeze(); 129 + bytes.freeze();
129 } 130 }
130 131
131 /** 132 /**
132 * Reports on the accumulated throughput trackers. 133 * Reports on the accumulated throughput trackers.
133 */ 134 */
134 public void report() { 135 public void report() {
135 -// DecimalFormat f = new DecimalFormat("#,##0"); 136 + DecimalFormat f = new DecimalFormat("#,##0");
136 -// log.info("{} messages; {} bytes; {} mps; {} Mbs", 137 + log.info("{} messages; {} bytes; {} mps; {} Mbs",
137 -// f.format(messages.total()), 138 + f.format(messages.total()),
138 -// f.format(bytes.total()), 139 + f.format(bytes.total()),
139 -// f.format(messages.throughput()), 140 + f.format(messages.throughput()),
140 -// f.format(bytes.throughput() / (1024 * 128))); 141 + f.format(bytes.throughput() / (1024 * 128)));
141 } 142 }
142 143
143 /** 144 /**
...@@ -170,15 +171,15 @@ public class StandaloneSpeedServer { ...@@ -170,15 +171,15 @@ public class StandaloneSpeedServer {
170 @Override 171 @Override
171 protected void removeStream(MessageStream<TestMessage> stream) { 172 protected void removeStream(MessageStream<TestMessage> stream) {
172 super.removeStream(stream); 173 super.removeStream(stream);
173 -// 174 +
174 -// messages.add(b.inMessages().total()); 175 + messages.add(stream.messagesIn().total());
175 -// bytes.add(b.inBytes().total()); 176 + bytes.add(stream.bytesIn().total());
176 -// 177 +
177 -// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps", 178 + log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
178 -// format.format(b.inMessages().throughput()), 179 + FORMAT.format(stream.messagesIn().throughput()),
179 -// format.format(b.inBytes().throughput() / (1024 * 128)), 180 + FORMAT.format(stream.bytesIn().throughput() / (1024 * 128)),
180 -// format.format(b.outMessages().throughput()), 181 + FORMAT.format(stream.messagesOut().throughput()),
181 -// format.format(b.outBytes().throughput() / (1024 * 128))); 182 + FORMAT.format(stream.bytesOut().throughput() / (1024 * 128)));
182 } 183 }
183 184
184 @Override 185 @Override
......
...@@ -23,57 +23,36 @@ import static org.junit.Assert.assertNull; ...@@ -23,57 +23,36 @@ import static org.junit.Assert.assertNull;
23 */ 23 */
24 public class MessageStreamTest { 24 public class MessageStreamTest {
25 25
26 - private static final int LENGTH = 16; 26 + private static final int SIZE = 16;
27 - 27 + private static final TestMessage MESSAGE = new TestMessage(SIZE);
28 - private static final TestMessage TM1 = new TestMessage(LENGTH);
29 - private static final TestMessage TM2 = new TestMessage(LENGTH);
30 - private static final TestMessage TM3 = new TestMessage(LENGTH);
31 - private static final TestMessage TM4 = new TestMessage(LENGTH);
32 28
33 private static final int BIG_SIZE = 32 * 1024; 29 private static final int BIG_SIZE = 32 * 1024;
34 private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE); 30 private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
35 31
36 - private static enum WritePending { 32 + private TestIOLoop loop;
37 - ON, OFF;
38 -
39 - public boolean on() {
40 - return this == ON;
41 - }
42 - }
43 -
44 - private static enum FlushRequired {
45 - ON, OFF;
46 -
47 - public boolean on() {
48 - return this == ON;
49 - }
50 - }
51 -
52 - private FakeIOLoop loop;
53 private TestByteChannel channel; 33 private TestByteChannel channel;
54 - private TestMessageStream buffer; 34 + private TestMessageStream stream;
55 private TestKey key; 35 private TestKey key;
56 36
57 @Before 37 @Before
58 public void setUp() throws IOException { 38 public void setUp() throws IOException {
59 - loop = new FakeIOLoop(); 39 + loop = new TestIOLoop();
60 channel = new TestByteChannel(); 40 channel = new TestByteChannel();
61 key = new TestKey(channel); 41 key = new TestKey(channel);
62 - buffer = loop.createStream(channel); 42 + stream = loop.createStream(channel);
63 - buffer.setKey(key); 43 + stream.setKey(key);
64 } 44 }
65 45
66 @After 46 @After
67 public void tearDown() { 47 public void tearDown() {
68 loop.shutdown(); 48 loop.shutdown();
69 - buffer.close(); 49 + stream.close();
70 } 50 }
71 51
72 - // Check state of the message buffer 52 + // Validates the state of the message stream
73 - private void assertState(WritePending wp, FlushRequired fr, 53 + private void validate(boolean wp, boolean fr, int read, int written) {
74 - int read, int written) { 54 + assertEquals(wp, stream.isWritePending());
75 - assertEquals(wp.on(), buffer.isWritePending()); 55 + assertEquals(fr, stream.isFlushRequired());
76 -// assertEquals(fr.on(), buffer.requiresFlush());
77 assertEquals(read, channel.readBytes); 56 assertEquals(read, channel.readBytes);
78 assertEquals(written, channel.writtenBytes); 57 assertEquals(written, channel.writtenBytes);
79 } 58 }
...@@ -81,155 +60,155 @@ public class MessageStreamTest { ...@@ -81,155 +60,155 @@ public class MessageStreamTest {
81 @Test 60 @Test
82 public void endOfStream() throws IOException { 61 public void endOfStream() throws IOException {
83 channel.close(); 62 channel.close();
84 - List<TestMessage> messages = buffer.read(); 63 + List<TestMessage> messages = stream.read();
85 assertNull(messages); 64 assertNull(messages);
86 } 65 }
87 66
88 @Test 67 @Test
89 public void bufferGrowth() throws IOException { 68 public void bufferGrowth() throws IOException {
90 - // Create a buffer for big messages and test the growth. 69 + // Create a stream for big messages and test the growth.
91 - buffer = new TestMessageStream(BIG_SIZE, channel, loop); 70 + stream = new TestMessageStream(BIG_SIZE, channel, loop);
92 - buffer.write(BIG_MESSAGE); 71 + stream.write(BIG_MESSAGE);
93 - buffer.write(BIG_MESSAGE); 72 + stream.write(BIG_MESSAGE);
94 - buffer.write(BIG_MESSAGE); 73 + stream.write(BIG_MESSAGE);
95 - buffer.write(BIG_MESSAGE); 74 + stream.write(BIG_MESSAGE);
96 - buffer.write(BIG_MESSAGE); 75 + stream.write(BIG_MESSAGE);
97 } 76 }
98 77
99 @Test 78 @Test
100 public void discardBeforeKey() { 79 public void discardBeforeKey() {
101 - // Create a buffer that does not yet have the key set and discard it. 80 + // Create a stream that does not yet have the key set and discard it.
102 - buffer = loop.createStream(channel); 81 + stream = loop.createStream(channel);
103 - assertNull(buffer.key()); 82 + assertNull(stream.key());
104 - buffer.close(); 83 + stream.close();
105 // There is not key, so nothing to check; we just expect no problem. 84 // There is not key, so nothing to check; we just expect no problem.
106 } 85 }
107 86
108 @Test 87 @Test
109 public void bufferedRead() throws IOException { 88 public void bufferedRead() throws IOException {
110 - channel.bytesToRead = LENGTH + 4; 89 + channel.bytesToRead = SIZE + 4;
111 - List<TestMessage> messages = buffer.read(); 90 + List<TestMessage> messages = stream.read();
112 assertEquals(1, messages.size()); 91 assertEquals(1, messages.size());
113 - assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0); 92 + validate(false, false, SIZE + 4, 0);
114 93
115 - channel.bytesToRead = LENGTH - 4; 94 + channel.bytesToRead = SIZE - 4;
116 - messages = buffer.read(); 95 + messages = stream.read();
117 assertEquals(1, messages.size()); 96 assertEquals(1, messages.size());
118 - assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, 0); 97 + validate(false, false, SIZE * 2, 0);
119 } 98 }
120 99
121 @Test 100 @Test
122 public void bufferedWrite() throws IOException { 101 public void bufferedWrite() throws IOException {
123 - assertState(WritePending.OFF, FlushRequired.OFF, 0, 0); 102 + validate(false, false, 0, 0);
124 103
125 // First write is immediate... 104 // First write is immediate...
126 - buffer.write(TM1); 105 + stream.write(MESSAGE);
127 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH); 106 + validate(false, false, 0, SIZE);
128 107
129 // Second and third get buffered... 108 // Second and third get buffered...
130 - buffer.write(TM2); 109 + stream.write(MESSAGE);
131 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH); 110 + validate(false, true, 0, SIZE);
132 - buffer.write(TM3); 111 + stream.write(MESSAGE);
133 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH); 112 + validate(false, true, 0, SIZE);
134 113
135 // Reset write, which will flush if needed; the next write is again buffered 114 // Reset write, which will flush if needed; the next write is again buffered
136 - buffer.flushIfWriteNotPending(); 115 + stream.flushIfWriteNotPending();
137 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 3); 116 + validate(false, false, 0, SIZE * 3);
138 - buffer.write(TM4); 117 + stream.write(MESSAGE);
139 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 3); 118 + validate(false, true, 0, SIZE * 3);
140 119
141 // Select reset, which will flush if needed; the next write is again buffered 120 // Select reset, which will flush if needed; the next write is again buffered
142 - buffer.flushIfPossible(); 121 + stream.flushIfPossible();
143 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4); 122 + validate(false, false, 0, SIZE * 4);
144 - buffer.write(TM1); 123 + stream.write(MESSAGE);
145 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4); 124 + validate(false, true, 0, SIZE * 4);
146 - buffer.flush(); 125 + stream.flush();
147 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4); 126 + validate(false, true, 0, SIZE * 4);
148 } 127 }
149 128
150 @Test 129 @Test
151 public void bufferedWriteList() throws IOException { 130 public void bufferedWriteList() throws IOException {
152 - assertState(WritePending.OFF, FlushRequired.OFF, 0, 0); 131 + validate(false, false, 0, 0);
153 132
154 // First write is immediate... 133 // First write is immediate...
155 - List<TestMessage> messages = new ArrayList<TestMessage>(); 134 + List<TestMessage> messages = new ArrayList<>();
156 - messages.add(TM1); 135 + messages.add(MESSAGE);
157 - messages.add(TM2); 136 + messages.add(MESSAGE);
158 - messages.add(TM3); 137 + messages.add(MESSAGE);
159 - messages.add(TM4); 138 + messages.add(MESSAGE);
160 139
161 - buffer.write(messages); 140 + stream.write(messages);
162 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4); 141 + validate(false, false, 0, SIZE * 4);
163 142
164 - buffer.write(messages); 143 + stream.write(messages);
165 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4); 144 + validate(false, true, 0, SIZE * 4);
166 145
167 - buffer.flushIfPossible(); 146 + stream.flushIfPossible();
168 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 8); 147 + validate(false, false, 0, SIZE * 8);
169 } 148 }
170 149
171 @Test 150 @Test
172 public void bufferedPartialWrite() throws IOException { 151 public void bufferedPartialWrite() throws IOException {
173 - assertState(WritePending.OFF, FlushRequired.OFF, 0, 0); 152 + validate(false, false, 0, 0);
174 153
175 // First write is immediate... 154 // First write is immediate...
176 - buffer.write(TM1); 155 + stream.write(MESSAGE);
177 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH); 156 + validate(false, false, 0, SIZE);
178 157
179 // Tell test channel to accept only half. 158 // Tell test channel to accept only half.
180 - channel.bytesToWrite = LENGTH / 2; 159 + channel.bytesToWrite = SIZE / 2;
181 160
182 // Second and third get buffered... 161 // Second and third get buffered...
183 - buffer.write(TM2); 162 + stream.write(MESSAGE);
184 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH); 163 + validate(false, true, 0, SIZE);
185 - buffer.flushIfPossible(); 164 + stream.flushIfPossible();
186 - assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2); 165 + validate(true, true, 0, SIZE + SIZE / 2);
187 } 166 }
188 167
189 @Test 168 @Test
190 public void bufferedPartialWrite2() throws IOException { 169 public void bufferedPartialWrite2() throws IOException {
191 - assertState(WritePending.OFF, FlushRequired.OFF, 0, 0); 170 + validate(false, false, 0, 0);
192 171
193 // First write is immediate... 172 // First write is immediate...
194 - buffer.write(TM1); 173 + stream.write(MESSAGE);
195 - assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH); 174 + validate(false, false, 0, SIZE);
196 175
197 // Tell test channel to accept only half. 176 // Tell test channel to accept only half.
198 - channel.bytesToWrite = LENGTH / 2; 177 + channel.bytesToWrite = SIZE / 2;
199 178
200 // Second and third get buffered... 179 // Second and third get buffered...
201 - buffer.write(TM2); 180 + stream.write(MESSAGE);
202 - assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH); 181 + validate(false, true, 0, SIZE);
203 - buffer.flushIfWriteNotPending(); 182 + stream.flushIfWriteNotPending();
204 - assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2); 183 + validate(true, true, 0, SIZE + SIZE / 2);
205 } 184 }
206 185
207 @Test 186 @Test
208 public void bufferedReadWrite() throws IOException { 187 public void bufferedReadWrite() throws IOException {
209 - channel.bytesToRead = LENGTH + 4; 188 + channel.bytesToRead = SIZE + 4;
210 - List<TestMessage> messages = buffer.read(); 189 + List<TestMessage> messages = stream.read();
211 assertEquals(1, messages.size()); 190 assertEquals(1, messages.size());
212 - assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0); 191 + validate(false, false, SIZE + 4, 0);
213 192
214 - buffer.write(TM1); 193 + stream.write(MESSAGE);
215 - assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, LENGTH); 194 + validate(false, false, SIZE + 4, SIZE);
216 195
217 - channel.bytesToRead = LENGTH - 4; 196 + channel.bytesToRead = SIZE - 4;
218 - messages = buffer.read(); 197 + messages = stream.read();
219 assertEquals(1, messages.size()); 198 assertEquals(1, messages.size());
220 - assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, LENGTH); 199 + validate(false, false, SIZE * 2, SIZE);
221 } 200 }
222 201
223 // Fake IO driver loop 202 // Fake IO driver loop
224 - private static class FakeIOLoop extends IOLoop<TestMessage, TestMessageStream> { 203 + private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
225 204
226 - public FakeIOLoop() throws IOException { 205 + public TestIOLoop() throws IOException {
227 super(500); 206 super(500);
228 } 207 }
229 208
230 @Override 209 @Override
231 protected TestMessageStream createStream(ByteChannel channel) { 210 protected TestMessageStream createStream(ByteChannel channel) {
232 - return new TestMessageStream(LENGTH, channel, this); 211 + return new TestMessageStream(SIZE, channel, this);
233 } 212 }
234 213
235 @Override 214 @Override
......