Madan Jampani
Committed by Gerrit Code Review

Drop unused onlab-nio module

Change-Id: I52141335643ad5b62b2a9cebe4d79faf0762e3e0
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onosproject</groupId>
<artifactId>onlab-utils</artifactId>
<version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onlab-nio</artifactId>
<packaging>bundle</packaging>
<description>Fast network I/O using Java NIO</description>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava-testlib</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-misc</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
/**
* Base {@link Message} implementation.
*/
public abstract class AbstractMessage implements Message {
protected int length;
@Override
public int length() {
return length;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Selector loop derivative tailored to acceptConnection inbound connections.
*/
public abstract class AcceptorLoop extends SelectorLoop {
private SocketAddress listenAddress;
private ServerSocketChannel socketChannel;
/**
* Creates an acceptor loop with the specified selection timeout and
* accepting connections on the the given address.
*
* @param selectTimeout selection timeout; specified in millis
* @param listenAddress socket address where to listen for connections
* @throws IOException if the backing selector cannot be opened
*/
public AcceptorLoop(long selectTimeout, SocketAddress listenAddress)
throws IOException {
super(selectTimeout);
this.listenAddress = checkNotNull(listenAddress, "Address cannot be null");
}
/**
* Hook to accept an inbound connection on the specified socket channel.
*
* @param channel socketChannel where an accept operation awaits
* @throws IOException if the accept operation cannot be processed
*/
protected abstract void acceptConnection(ServerSocketChannel channel) throws IOException;
/**
* Opens a new server socket channel configured in non-blocking mode and
* bound to the loop's listen address.
*
* @throws IOException if unable to open or configure the socket channel
*/
protected synchronized void openChannel() throws IOException {
socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
socketChannel.bind(listenAddress);
}
/**
* Closes the server socket channel.
*
* @throws IOException if unable to close the socketChannel
*/
protected synchronized void closechannel() throws IOException {
if (socketChannel != null) {
socketChannel.close();
socketChannel = null;
}
}
@Override
public void shutdown() {
try {
closechannel();
} catch (IOException e) {
log.warn("Unable to close the socketChannel", e);
}
super.shutdown();
}
@Override
protected void loop() throws IOException {
openChannel();
notifyReady();
// Keep looping until told otherwise.
while (isRunning()) {
// Attempt a selection; if no operations selected or if signalled
// to shutdown, spin through.
int count = selector.select(selectTimeout);
if (count == 0 || !isRunning()) {
continue;
}
// Iterate over all keys selected for an operation and process them.
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
// Fetch the key and remove it from the pending list.
SelectionKey key = keys.next();
keys.remove();
// If the key has a pending acceptConnection operation, process it.
if (key.isAcceptable()) {
acceptConnection((ServerSocketChannel) key.channel());
}
}
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* I/O loop for driving inbound &amp; outbound {@link Message} transfer via
* {@link MessageStream}.
*
* @param <M> message type
* @param <S> message stream type
*/
public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
extends SelectorLoop {
// Queue of requests for new message streams to enter the IO loop processing.
private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
// Carries information required for admitting a new message stream.
private class NewStreamRequest {
private final S stream;
private final SelectableChannel channel;
private final int op;
public NewStreamRequest(S stream, SelectableChannel channel, int op) {
this.stream = stream;
this.channel = channel;
this.op = op;
}
}
// Set of message streams currently admitted into the IO loop.
private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
/**
* Creates an IO loop with the given selection timeout.
*
* @param timeout selection timeout in milliseconds
* @throws IOException if the backing selector cannot be opened
*/
public IOLoop(long timeout) throws IOException {
super(timeout);
}
/**
* Returns the number of message stream in custody of the loop.
*
* @return number of message streams
*/
public int streamCount() {
return streams.size();
}
/**
* Creates a new message stream backed by the specified socket channel.
*
* @param byteChannel backing byte channel
* @return newly created message stream
*/
protected abstract S createStream(ByteChannel byteChannel);
/**
* Removes the specified message stream from the IO loop.
*
* @param stream message stream to remove
*/
protected void removeStream(MessageStream<M> stream) {
streams.remove(stream);
}
/**
* Processes the list of messages extracted from the specified message
* stream.
*
* @param messages non-empty list of received messages
* @param stream message stream from which the messages were extracted
*/
protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
/**
* Completes connection request pending on the given selection key.
*
* @param key selection key holding the pending connect operation.
* @throws IOException when I/O exception of some sort has occurred
*/
protected void connect(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
ch.finishConnect();
if (key.isValid()) {
key.interestOps(SelectionKey.OP_READ);
}
}
/**
* Processes an IO operation pending on the specified key.
*
* @param key selection key holding the pending I/O operation.
*/
protected void processKeyOperation(SelectionKey key) {
@SuppressWarnings("unchecked")
S stream = (S) key.attachment();
try {
// If the key is not valid, bail out.
if (!key.isValid()) {
stream.close();
return;
}
// If there is a pending connect operation, complete it.
if (key.isConnectable()) {
try {
connect(key);
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
}
// If there is a read operation, slurp as much data as possible.
if (key.isReadable()) {
List<M> messages = stream.read();
// No messages or failed flush imply disconnect; bail.
if (messages == null || stream.hadError()) {
stream.close();
return;
}
// If there were any messages read, process them.
if (!messages.isEmpty()) {
try {
processMessages(messages, stream);
} catch (RuntimeException e) {
onError(stream, e);
}
}
}
// If there are pending writes, flush them
if (key.isWritable()) {
stream.flushIfPossible();
}
// If there were any issued flushing, close the stream.
if (stream.hadError()) {
stream.close();
}
} catch (CancelledKeyException e) {
// Key was cancelled, so silently close the stream
stream.close();
} catch (IOException e) {
if (!stream.isClosed() && !isResetByPeer(e)) {
log.warn("Unable to process IO", e);
}
stream.close();
}
}
// Indicates whether or not this exception is caused by 'reset by peer'.
private boolean isResetByPeer(IOException e) {
Throwable cause = e.getCause();
return cause != null && cause instanceof IOException &&
cause.getMessage().contains("reset by peer");
}
/**
* Hook to allow intercept of any errors caused during message processing.
* Default behaviour is to rethrow the error.
*
* @param stream message stream involved in the error
* @param error the runtime exception
*/
protected void onError(S stream, RuntimeException error) {
throw error;
}
/**
* Admits a new message stream backed by the specified socket channel
* with a pending accept operation.
*
* @param channel backing socket channel
* @return newly accepted message stream
*/
public S acceptStream(SocketChannel channel) {
return createAndAdmit(channel, SelectionKey.OP_READ);
}
/**
* Admits a new message stream backed by the specified socket channel
* with a pending connect operation.
*
* @param channel backing socket channel
* @return newly connected message stream
*/
public S connectStream(SocketChannel channel) {
return createAndAdmit(channel, SelectionKey.OP_CONNECT);
}
/**
* Creates a new message stream backed by the specified socket channel
* and admits it into the IO loop.
*
* @param channel socket channel
* @param op pending operations mask to be applied to the selection
* key as a set of initial interestedOps
* @return newly created message stream
*/
private synchronized S createAndAdmit(SocketChannel channel, int op) {
S stream = createStream(channel);
streams.add(stream);
newStreamRequests.add(new NewStreamRequest(stream, channel, op));
selector.wakeup();
return stream;
}
/**
* Safely admits new streams into the IO loop.
*/
private void admitNewStreams() {
Iterator<NewStreamRequest> it = newStreamRequests.iterator();
while (isRunning() && it.hasNext()) {
try {
NewStreamRequest request = it.next();
it.remove();
SelectionKey key = request.channel.register(selector, request.op,
request.stream);
request.stream.setKey(key);
} catch (ClosedChannelException e) {
log.warn("Unable to admit new message stream", e);
}
}
}
@Override
protected void loop() throws IOException {
notifyReady();
// Keep going until told otherwise.
while (isRunning()) {
admitNewStreams();
// Process flushes & write selects on all streams
for (MessageStream<M> stream : streams) {
stream.flushIfWriteNotPending();
}
// Select keys and process them.
int count = selector.select(selectTimeout);
if (count > 0 && isRunning()) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
processKeyOperation(key);
}
}
}
}
/**
* Prunes the registered streams by discarding any stale ones.
*
* @return number of remaining streams
*/
public synchronized int pruneStaleStreams() {
for (MessageStream<M> stream : streams) {
if (stream.isStale()) {
stream.close();
}
}
return streams.size();
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
/**
* Representation of a message transferred via {@link MessageStream}.
*/
public interface Message {
/**
* Gets the message length in bytes.
*
* @return number of bytes
*/
int length();
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.System.currentTimeMillis;
import static java.nio.ByteBuffer.allocateDirect;
/**
* Bi-directional message stream for transferring messages to &amp; from the
* network via two byte buffers.
*
* @param <M> message type
*/
public abstract class MessageStream<M extends Message> {
protected Logger log = LoggerFactory.getLogger(getClass());
private final IOLoop<M, ?> loop;
private final ByteChannel channel;
private final int maxIdleMillis;
private final ByteBuffer inbound;
private ByteBuffer outbound;
private SelectionKey key;
private volatile boolean closed = false;
private volatile boolean writePending;
private volatile boolean writeOccurred;
private Exception ioError;
private long lastActiveTime;
private final Counter bytesIn = new Counter();
private final Counter messagesIn = new Counter();
private final Counter bytesOut = new Counter();
private final Counter messagesOut = new Counter();
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
* before it will be closed
*/
protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
this.loop = checkNotNull(loop, "Loop cannot be null");
this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
checkArgument(maxIdleMillis > 0, "Idle time must be positive");
this.maxIdleMillis = maxIdleMillis;
inbound = allocateDirect(bufferSize);
outbound = allocateDirect(bufferSize);
}
/**
* Gets a single message from the specified byte buffer; this is
* to be done without manipulating the buffer via flip, reset or clear.
*
* @param buffer byte buffer
* @return read message or null if there are not enough bytes to read
* a complete message
*/
protected abstract M read(ByteBuffer buffer);
/**
* Puts the specified message into the specified byte buffer; this is
* to be done without manipulating the buffer via flip, reset or clear.
*
* @param message message to be write into the buffer
* @param buffer byte buffer
*/
protected abstract void write(M message, ByteBuffer buffer);
/**
* Closes the message buffer.
*/
public void close() {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
bytesIn.freeze();
bytesOut.freeze();
messagesIn.freeze();
messagesOut.freeze();
loop.removeStream(this);
if (key != null) {
try {
key.cancel();
key.channel().close();
} catch (IOException e) {
log.warn("Unable to close stream", e);
}
}
}
/**
* Indicates whether this buffer has been closed.
*
* @return true if this stream has been closed
*/
public synchronized boolean isClosed() {
return closed;
}
/**
* Returns the stream IO selection key.
*
* @return socket channel registration selection key
*/
public SelectionKey key() {
return key;
}
/**
* Binds the selection key to be used for driving IO operations on the stream.
*
* @param key IO selection key
*/
public void setKey(SelectionKey key) {
this.key = key;
this.lastActiveTime = currentTimeMillis();
}
/**
* Returns the IO loop to which this stream is bound.
*
* @return I/O loop used to drive this stream
*/
public IOLoop<M, ?> loop() {
return loop;
}
/**
* Indicates whether the any prior IO encountered an error.
*
* @return true if a write failed
*/
public boolean hadError() {
return ioError != null;
}
/**
* Gets the prior IO error, if one occurred.
*
* @return IO error; null if none occurred
*/
public Exception getError() {
return ioError;
}
/**
* Reads, without blocking, a list of messages from the stream.
* The list will be empty if there were not messages pending.
*
* @return list of messages or null if backing channel has been closed
* @throws IOException if messages could not be read
*/
public List<M> read() throws IOException {
try {
int read = channel.read(inbound);
if (read != -1) {
// Read the messages one-by-one and add them to the list.
List<M> messages = new ArrayList<>();
M message;
inbound.flip();
while ((message = read(inbound)) != null) {
messages.add(message);
messagesIn.add(1);
bytesIn.add(message.length());
}
inbound.compact();
// Mark the stream with current time to indicate liveness.
lastActiveTime = currentTimeMillis();
return messages;
}
return null;
} catch (Exception e) {
throw new IOException("Unable to read messages", e);
}
}
/**
* Writes the specified list of messages to the stream.
*
* @param messages list of messages to write
* @throws IOException if error occurred while writing the data
*/
public void write(List<M> messages) throws IOException {
synchronized (this) {
// First write all messages.
for (M m : messages) {
append(m);
}
flushUnlessAlreadyPlanningTo();
}
}
/**
* Writes the given message to the stream.
*
* @param message message to write
* @throws IOException if error occurred while writing the data
*/
public void write(M message) throws IOException {
synchronized (this) {
append(message);
flushUnlessAlreadyPlanningTo();
}
}
// Appends the specified message into the internal buffer, growing the
// buffer if required.
private void append(M message) {
// If the buffer does not have sufficient length double it.
while (outbound.remaining() < message.length()) {
doubleSize();
}
write(message, outbound);
messagesOut.add(1);
bytesOut.add(message.length());
}
// Forces a flush, unless one is planned already.
private void flushUnlessAlreadyPlanningTo() throws IOException {
if (!writeOccurred && !writePending) {
flush();
}
}
/**
* Flushes any pending writes.
*
* @throws IOException if flush failed
*/
public void flush() throws IOException {
synchronized (this) {
if (!writeOccurred && !writePending) {
outbound.flip();
try {
channel.write(outbound);
} catch (IOException e) {
if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
log.warn("Unable to write data", e);
ioError = e;
}
}
lastActiveTime = currentTimeMillis();
writeOccurred = true;
writePending = outbound.hasRemaining();
outbound.compact();
}
}
}
/**
* Indicates whether the stream has bytes to be written to the channel.
*
* @return true if there are bytes to be written
*/
boolean isWritePending() {
synchronized (this) {
return writePending;
}
}
/**
* Indicates whether data has been written but not flushed yet.
*
* @return true if flush is required
*/
boolean isFlushRequired() {
synchronized (this) {
return outbound.position() > 0;
}
}
/**
* Attempts to flush data, internal stream state and channel availability
* permitting. Invoked by the driver I/O loop during handling of writable
* selection key.
* <p>
* Resets the internal state flags {@code writeOccurred} and
* {@code writePending}.
* </p>
* @throws IOException if implicit flush failed
*/
void flushIfPossible() throws IOException {
synchronized (this) {
writePending = false;
writeOccurred = false;
if (outbound.position() > 0) {
flush();
}
}
key.interestOps(SelectionKey.OP_READ);
}
/**
* Attempts to flush data, internal stream state and channel availability
* permitting and if other writes are not pending. Invoked by the driver
* I/O loop prior to entering select wait. Resets the internal
* {@code writeOccurred} state flag.
*
* @throws IOException if implicit flush failed
*/
void flushIfWriteNotPending() throws IOException {
synchronized (this) {
writeOccurred = false;
if (!writePending && outbound.position() > 0) {
flush();
}
}
if (isWritePending()) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
}
/**
* Doubles the size of the outbound buffer.
*/
private void doubleSize() {
ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
outbound.flip();
newBuffer.put(outbound);
outbound = newBuffer;
}
/**
* Returns the maximum number of milliseconds the stream is allowed
* without any read/write operations.
*
* @return number if millis of permissible idle time
*/
protected int maxIdleMillis() {
return maxIdleMillis;
}
/**
* Returns true if the given stream has gone stale.
*
* @return true if the stream is stale
*/
boolean isStale() {
return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
}
/**
* Returns the inbound bytes counter.
*
* @return inbound bytes counter
*/
public Counter bytesIn() {
return bytesIn;
}
/**
* Returns the outbound bytes counter.
*
* @return outbound bytes counter
*/
public Counter bytesOut() {
return bytesOut;
}
/**
* Returns the inbound messages counter.
*
* @return inbound messages counter
*/
public Counter messagesIn() {
return messagesIn;
}
/**
* Returns the outbound messages counter.
*
* @return outbound messages counter
*/
public Counter messagesOut() {
return messagesOut;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.Selector;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.System.currentTimeMillis;
/**
* Abstraction of an I/O processing loop based on an NIO selector.
*/
public abstract class SelectorLoop implements Runnable {
protected final Logger log = LoggerFactory.getLogger(getClass());
/**
* Selector used by this loop to pace the I/O operations.
*/
protected final Selector selector;
/**
* Selection operations timeout; specified in millis.
*/
protected long selectTimeout;
/**
* Retains the error that caused the loop to exit prematurely.
*/
private Throwable error;
// State indicator
private enum State { STARTING, STARTED, STOPPING, STOPPED };
private volatile State state = State.STOPPED;
/**
* Creates a new selector loop with the given selection timeout.
*
* @param selectTimeout selection timeout; specified in millis
* @throws IOException if the backing selector cannot be opened
*/
public SelectorLoop(long selectTimeout) throws IOException {
checkArgument(selectTimeout > 0, "Timeout must be positive");
this.selectTimeout = selectTimeout;
this.selector = openSelector();
}
/**
* Opens a new selector for the use by the loop.
*
* @return newly open selector
* @throws IOException if the backing selector cannot be opened
*/
protected Selector openSelector() throws IOException {
return Selector.open();
}
/**
* Indicates that the loop is marked to run.
* @return true if the loop is marked to run
*/
protected boolean isRunning() {
return state == State.STARTED || state == State.STARTING;
}
/**
* Returns the error, if there was one, that caused the loop to terminate
* prematurely.
*
* @return error or null if there was none
*/
public Throwable getError() {
return error;
}
/**
* Contains the body of the I/O selector loop.
*
* @throws IOException if an error is encountered while selecting I/O
*/
protected abstract void loop() throws IOException;
@Override
public void run() {
error = null;
state = State.STARTING;
try {
loop();
} catch (Exception e) {
error = e;
log.error("Loop aborted", e);
}
notifyDone();
}
/**
* Notifies observers waiting for loop to become ready.
*/
protected synchronized void notifyReady() {
state = State.STARTED;
notifyAll();
}
/**
* Triggers loop shutdown.
*/
public void shutdown() {
// Mark the loop as no longer running and wake up the selector.
state = State.STOPPING;
selector.wakeup();
}
/**
* Notifies observers waiting for loop to fully stop.
*/
private synchronized void notifyDone() {
state = State.STOPPED;
notifyAll();
}
/**
* Waits for the loop execution to start.
*
* @param timeout number of milliseconds to wait
* @return true if loop started in time
*/
public final synchronized boolean awaitStart(long timeout) {
long max = currentTimeMillis() + timeout;
while (state != State.STARTED && (currentTimeMillis() < max)) {
try {
wait(timeout);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
return state == State.STARTED;
}
/**
* Waits for the loop execution to stop.
*
* @param timeout number of milliseconds to wait
* @return true if loop finished in time
*/
public final synchronized boolean awaitStop(long timeout) {
long max = currentTimeMillis() + timeout;
while (state != State.STOPPED && (currentTimeMillis() < max)) {
try {
wait(timeout);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted", e);
}
}
return state == State.STOPPED;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Mechanism to transfer messages over network using IO loop and
* message stream, backed by NIO byte buffers.
*/
package org.onlab.nio;
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import java.io.IOException;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.List;
import java.util.function.Consumer;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
/**
* IOLoop for transporting DefaultMessages.
*/
public class DefaultIOLoop extends IOLoop<DefaultMessage, DefaultMessageStream> {
public static final int SELECT_TIMEOUT_MILLIS = 500;
private static final int MAX_IDLE_TIMEOUT_MILLIS = 1000;
private static final int BUFFER_SIZE = 1024 * 1024;
private final Consumer<DefaultMessage> consumer;
public DefaultIOLoop(Consumer<DefaultMessage> consumer) throws IOException {
this(SELECT_TIMEOUT_MILLIS, consumer);
}
public DefaultIOLoop(long timeout, Consumer<DefaultMessage> consumer) throws IOException {
super(timeout);
this.consumer = consumer;
}
@Override
protected DefaultMessageStream createStream(ByteChannel byteChannel) {
return new DefaultMessageStream(this, byteChannel, BUFFER_SIZE, MAX_IDLE_TIMEOUT_MILLIS);
}
@Override
protected void processMessages(List<DefaultMessage> messages, MessageStream<DefaultMessage> stream) {
messages.forEach(consumer);
}
@Override
protected void connect(SelectionKey key) throws IOException {
DefaultMessageStream stream = (DefaultMessageStream) key.attachment();
try {
super.connect(key);
stream.connected();
} catch (Exception e) {
stream.connectFailed(e);
}
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.nio.AbstractMessage;
import org.onlab.packet.IpAddress;
import org.onlab.util.ByteArraySizeHashPrinter;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.Charsets;
import com.google.common.base.MoreObjects;
/**
* Default message.
*/
public class DefaultMessage extends AbstractMessage {
private long id;
private Endpoint sender;
private String type;
private byte[] payload;
/**
* Creates a new message with the specified data.
*
* @param id message id
* @param type message type
* @param sender sender endpoint
* @param payload message payload
*/
DefaultMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.type = checkNotNull(type, "Type cannot be null");
this.sender = checkNotNull(sender, "Sender cannot be null");
this.payload = checkNotNull(payload, "Payload cannot be null");
byte[] messageTypeBytes = type.getBytes(Charsets.UTF_8);
IpAddress senderIp = sender.host();
byte[] ipOctets = senderIp.toOctets();
length = 25 + ipOctets.length + messageTypeBytes.length + payload.length;
}
/**
* Returns message id.
*
* @return message id
*/
public long id() {
return id;
}
/**
* Returns message sender.
*
* @return message sender
*/
public Endpoint sender() {
return sender;
}
/**
* Returns message type.
*
* @return message type
*/
public String type() {
return type;
}
/**
* Returns message payload.
*
* @return payload
*/
public byte[] payload() {
return payload;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("type", type)
.add("sender", sender)
.add("payload", ByteArraySizeHashPrinter.of(payload))
.toString();
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpAddress.Version;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.base.Charsets;
/**
* Default bi-directional message stream for transferring messages to &amp; from the
* network via two byte buffers.
*/
public class DefaultMessageStream extends MessageStream<DefaultMessage> {
private final CompletableFuture<Void> connectFuture = new CompletableFuture<>();
public DefaultMessageStream(
IOLoop<DefaultMessage, ?> loop,
ByteChannel byteChannel,
int bufferSize,
int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
}
public CompletableFuture<DefaultMessageStream> connectedFuture() {
return connectFuture.thenApply(v -> this);
}
private final AtomicInteger messageLength = new AtomicInteger(-1);
@Override
protected DefaultMessage read(ByteBuffer buffer) {
if (messageLength.get() == -1) {
// check if we can read the message length.
if (buffer.remaining() < Integer.BYTES) {
return null;
} else {
messageLength.set(buffer.getInt());
}
}
if (buffer.remaining() < messageLength.get()) {
return null;
}
long id = buffer.getLong();
Version ipVersion = buffer.get() == 0x0 ? Version.INET : Version.INET6;
byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
buffer.get(octects);
IpAddress senderIp = IpAddress.valueOf(ipVersion, octects);
int senderPort = buffer.getInt();
int messageTypeByteLength = buffer.getInt();
byte[] messageTypeBytes = new byte[messageTypeByteLength];
buffer.get(messageTypeBytes);
String messageType = new String(messageTypeBytes, Charsets.UTF_8);
int payloadLength = buffer.getInt();
byte[] payloadBytes = new byte[payloadLength];
buffer.get(payloadBytes);
// reset for next message
messageLength.set(-1);
return new DefaultMessage(id, new Endpoint(senderIp, senderPort), messageType, payloadBytes);
}
@Override
protected void write(DefaultMessage message, ByteBuffer buffer) {
Endpoint sender = message.sender();
byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
IpAddress senderIp = sender.host();
byte[] ipOctets = senderIp.toOctets();
byte[] payload = message.payload();
int messageLength = 21 + ipOctets.length + messageTypeBytes.length + payload.length;
buffer.putInt(messageLength);
buffer.putLong(message.id());
if (senderIp.version() == Version.INET) {
buffer.put((byte) 0x0);
} else {
buffer.put((byte) 0x1);
}
buffer.put(ipOctets);
// write sender port
buffer.putInt(sender.port());
// write length of message type
buffer.putInt(messageTypeBytes.length);
// write message type bytes
buffer.put(messageTypeBytes);
// write payload length
buffer.putInt(payload.length);
// write payload.
buffer.put(payload);
}
/**
* Callback invoked when the stream is successfully connected.
*/
public void connected() {
connectFuture.complete(null);
}
/**
* Callback invoked when the stream fails to connect.
* @param cause failure cause
*/
public void connectFailed(Throwable cause) {
connectFuture.completeExceptionally(cause);
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio.service;
import static org.onlab.util.Tools.groupedThreads;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.SelectorLoop;
import org.onosproject.store.cluster.messaging.Endpoint;
import org.onosproject.store.cluster.messaging.MessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
/**
* MessagingService implementation based on IOLoop.
*/
public class IOLoopMessaging implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
private static final int NUM_WORKERS = 8;
private AcceptorLoop acceptorLoop;
private final ExecutorService acceptorThreadPool =
Executors.newSingleThreadExecutor(groupedThreads("onos/nio/messaging", "acceptor"));
private final ExecutorService ioThreadPool =
Executors.newFixedThreadPool(NUM_WORKERS, groupedThreads("onos/nio/messaging", "io-loop-worker-%d"));
private final List<DefaultIOLoop> ioLoops = Lists.newArrayList();
private int lastWorker = -1;
private final AtomicBoolean started = new AtomicBoolean(false);
private Endpoint localEp;
private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
private final ConcurrentMap<String, Consumer<DefaultMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, Callback> responseFutures = CacheBuilder.newBuilder()
.removalListener(new RemovalListener<Long, Callback>() {
@Override
public void onRemoval(RemovalNotification<Long, Callback> entry) {
if (entry.wasEvicted()) {
entry.getValue().completeExceptionally(new TimeoutException("Timedout waiting for reply"));
}
}
})
.build();
/**
* Activates IO Loops.
*
* @param localEp local end-point
* @throws IOException is activation fails
*/
public void start(Endpoint localEp) throws IOException {
if (started.get()) {
log.warn("IOMessaging is already running at {}", localEp);
return;
}
this.localEp = localEp;
streams.setLifo(false);
this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
for (int i = 0; i < NUM_WORKERS; i++) {
ioLoops.add(new DefaultIOLoop(this::dispatchLocally));
}
ioLoops.forEach(ioThreadPool::execute);
acceptorThreadPool.execute(acceptorLoop);
ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
acceptorLoop.awaitStart(TIMEOUT);
started.set(true);
}
/**
* Shuts down IO loops.
*/
public void stop() {
if (started.get()) {
ioLoops.forEach(SelectorLoop::shutdown);
acceptorLoop.shutdown();
ioThreadPool.shutdown();
acceptorThreadPool.shutdown();
started.set(false);
}
}
@Override
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
DefaultMessage message = new DefaultMessage(
messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
return sendAsync(ep, message);
}
protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (ep.equals(localEp)) {
dispatchLocally(message);
future.complete(null);
return future;
}
DefaultMessageStream stream = null;
try {
stream = streams.borrowObject(ep);
stream.write(message);
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
try {
streams.returnObject(ep, stream);
} catch (Exception e) {
log.warn("Failed to return stream to pool");
}
}
return future;
}
@Override
public CompletableFuture<byte[]> sendAndReceive(
Endpoint ep,
String type,
byte[] payload,
Executor executor) {
CompletableFuture<byte[]> response = new CompletableFuture<>();
Callback callback = new Callback(response, executor);
Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, callback);
DefaultMessage message = new DefaultMessage(messageId, localEp, type, payload);
return sendAsync(ep, message).whenComplete((r, e) -> {
if (e != null) {
responseFutures.invalidate(messageId);
}
}).thenCompose(v -> response);
}
@Override
public CompletableFuture<byte[]> sendAndReceive(
Endpoint ep,
String type,
byte[] payload) {
return sendAndReceive(ep, type, payload, MoreExecutors.directExecutor());
}
@Override
public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> handler.accept(message.sender(), message.payload())));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
handlers.put(type, message -> executor.execute(() -> {
byte[] responsePayload = handler.apply(message.sender(), message.payload());
if (responsePayload != null) {
DefaultMessage response = new DefaultMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
sendAsync(message.sender(), response).whenComplete((result, error) -> {
log.debug("Failed to respond", error);
});
}
}));
}
@Override
public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.sender(), message.payload()).whenComplete((result, error) -> {
if (error == null) {
DefaultMessage response = new DefaultMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
result);
sendAsync(message.sender(), response).whenComplete((r, e) -> {
if (e != null) {
log.debug("Failed to respond", e);
}
});
}
});
});
}
@Override
public void unregisterHandler(String type) {
handlers.remove(type);
}
protected void dispatchLocally(DefaultMessage message) {
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {
try {
Callback callback =
responseFutures.getIfPresent(message.id());
if (callback != null) {
callback.complete(message.payload());
} else {
log.warn("Received a reply for message id:[{}]. "
+ " from {}. But was unable to locate the"
+ " request handle", message.id(), message.sender());
}
} finally {
responseFutures.invalidate(message.id());
}
return;
}
Consumer<DefaultMessage> handler = handlers.get(type);
if (handler != null) {
handler.accept(message);
} else {
log.debug("No handler registered for {}", type);
}
}
// Get the next worker to which a client should be assigned
private synchronized DefaultIOLoop nextWorker() {
lastWorker = (lastWorker + 1) % NUM_WORKERS;
return ioLoops.get(lastWorker);
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private DefaultMessageStream createConnection(Endpoint ep, DefaultIOLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(ep.host().toString(), ep.port());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
DefaultMessageStream stream = loop.connectStream(ch);
ch.connect(sa);
return stream;
}
// Loop for accepting client connections
private class DefaultAcceptorLoop extends AcceptorLoop {
public DefaultAcceptorLoop(SocketAddress address) throws IOException {
super(DefaultIOLoop.SELECT_TIMEOUT_MILLIS, address);
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
nextWorker().acceptStream(sc);
}
}
private class DefaultMessageStreamFactory implements KeyedPoolableObjectFactory<Endpoint, DefaultMessageStream> {
@Override
public void activateObject(Endpoint endpoint, DefaultMessageStream stream) throws Exception {
}
@Override
public void destroyObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
stream.close();
}
@Override
public DefaultMessageStream makeObject(Endpoint ep) throws Exception {
DefaultMessageStream stream = createConnection(ep, nextWorker()).connectedFuture().get();
log.info("Established a new connection to {}", ep);
return stream;
}
@Override
public void passivateObject(Endpoint ep, DefaultMessageStream stream) throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, DefaultMessageStream stream) {
return stream.isClosed();
}
}
private final class Callback {
private final CompletableFuture<byte[]> future;
private final Executor executor;
public Callback(CompletableFuture<byte[]> future, Executor executor) {
this.future = future;
this.executor = executor;
}
public void complete(byte[] value) {
executor.execute(() -> future.complete(value));
}
public void completeExceptionally(Throwable error) {
executor.execute(() -> future.completeExceptionally(error));
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Assembly for sending and receiving messages using the I/O loop mechanism.
*/
package org.onlab.nio.service;
\ No newline at end of file
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import org.junit.Before;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.junit.Assert.fail;
import static org.onlab.util.Tools.namedThreads;
/**
* Base class for various NIO loop unit tests.
*/
public abstract class AbstractLoopTest {
protected static final long MAX_MS_WAIT = 1500;
/** Block on specified countdown latch. Return when countdown reaches
* zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires.
*
* @param latch the latch
* @param label an identifying label
*/
protected void waitForLatch(CountDownLatch latch, String label) {
try {
boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS);
if (!ok) {
fail("Latch await timeout! [" + label + "]");
}
} catch (InterruptedException e) {
System.out.println("Latch interrupt [" + label + "] : " + e);
fail("Unexpected interrupt");
}
}
protected ExecutorService exec;
@Before
public void setUp() {
exec = newSingleThreadExecutor(namedThreads("test"));
}
}
/*
* Copyright 2014-2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
import static org.onlab.junit.TestTools.delay;
/**
* Unit tests for AcceptLoop.
*/
public class AcceptorLoopTest extends AbstractLoopTest {
private static final int PICK_EPHEMERAL = 0;
private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PICK_EPHEMERAL);
private static class MyAcceptLoop extends AcceptorLoop {
private final CountDownLatch loopStarted = new CountDownLatch(1);
private final CountDownLatch loopFinished = new CountDownLatch(1);
private final CountDownLatch runDone = new CountDownLatch(1);
private final CountDownLatch ceaseLatch = new CountDownLatch(1);
private int acceptCount = 0;
MyAcceptLoop() throws IOException {
super(500, SOCK_ADDR);
}
@Override
protected void acceptConnection(ServerSocketChannel ssc) throws IOException {
acceptCount++;
}
@Override
public void loop() throws IOException {
loopStarted.countDown();
super.loop();
loopFinished.countDown();
}
@Override
public void run() {
super.run();
runDone.countDown();
}
@Override
public void shutdown() {
super.shutdown();
ceaseLatch.countDown();
}
}
@Test
public void basic() throws IOException {
MyAcceptLoop myAccLoop = new MyAcceptLoop();
AcceptorLoop accLoop = myAccLoop;
exec.execute(accLoop);
waitForLatch(myAccLoop.loopStarted, "loopStarted");
delay(200); // take a quick nap
accLoop.shutdown();
waitForLatch(myAccLoop.loopFinished, "loopFinished");
waitForLatch(myAccLoop.runDone, "runDone");
assertEquals(0, myAccLoop.acceptCount);
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.net.InetAddress;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.onlab.junit.TestTools.delay;
/**
* Integration test for the select, accept and IO loops.
*/
public class IOLoopIntegrationTest {
private static final int THREADS = 6;
private static final int TIMEOUT = 60;
private static final int MESSAGE_LENGTH = 128;
private static final int MILLION = 1000000;
private static final int MSG_COUNT = 40 * MILLION;
@Before
public void warmUp() throws Exception {
Logger.getLogger("").setLevel(Level.SEVERE);
try {
runTest(MILLION, MESSAGE_LENGTH, 15);
} catch (Throwable e) {
System.err.println("Failed warmup but moving on.");
e.printStackTrace();
}
}
// TODO: this test can not pass in some environments, need to be improved
@Ignore
@Test
public void basic() throws Exception {
runTest(MILLION, MESSAGE_LENGTH, TIMEOUT);
}
public void longHaul() throws Exception {
runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT);
}
private void runTest(int count, int size, int timeout) throws Exception {
// Use a random port to prevent conflicts.
int port = IOLoopTestServer.PORT + new Random().nextInt(100);
InetAddress ip = InetAddress.getLoopbackAddress();
IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port);
IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port);
server.start();
client.start();
delay(100); // Pause to allow loops to get going
client.await(timeout);
client.report();
server.stop();
server.report();
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import com.google.common.collect.Lists;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.lang.String.format;
import static java.lang.System.nanoTime;
import static java.lang.System.out;
import static org.onlab.nio.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopTestClient {
private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
private final InetAddress ip;
private final int port;
private final int msgCount;
private final int msgLength;
private final List<CustomIOLoop> iloops = new ArrayList<>();
private final ExecutorService ipool;
private final ExecutorService wpool;
Counter messages;
Counter bytes;
long latencyTotal = 0;
long latencyCount = 0;
/**
* Main entry point to launch the client.
*
* @param args command-line arguments
* @throws java.io.IOException if unable to connect to server
* @throws InterruptedException if latch wait gets interrupted
* @throws java.util.concurrent.ExecutionException if wait gets interrupted
* @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
startStandalone(args);
System.exit(0);
}
/**
* Starts a standalone IO loop test client.
*
* @param args command-line arguments
*/
public static void startStandalone(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
wc, mc, ml, ip);
IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
client.start();
delay(500);
client.await(to);
client.report();
}
/**
* Creates a speed client.
*
* @param ip ip address of server
* @param wc worker count
* @param mc message count to send per client
* @param ml message length in bytes
* @param port socket port
* @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
this.ip = ip;
this.port = port;
this.msgCount = mc;
this.msgLength = ml;
this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
for (int i = 0; i < wc; i++) {
iloops.add(new CustomIOLoop());
}
}
/**
* Starts the client workers.
*
* @throws java.io.IOException if unable to open connection
*/
public void start() throws IOException {
messages = new Counter();
bytes = new Counter();
// First start up all the IO loops
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
// Wait for all of them to get going
for (CustomIOLoop l : iloops) {
l.awaitStart(1000);
}
// ... and Next open all connections; one-per-loop
for (CustomIOLoop l : iloops) {
openConnection(l);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void openConnection(CustomIOLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(ip, port);
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
loop.connectStream(ch);
ch.connect(sa);
}
/**
* Waits for the client workers to complete.
*
* @param secs timeout in seconds
* @throws java.util.concurrent.ExecutionException if execution failed
* @throws InterruptedException if interrupt occurred while waiting
* @throws java.util.concurrent.TimeoutException if timeout occurred
*/
public void await(int secs) throws InterruptedException,
ExecutionException, TimeoutException {
for (CustomIOLoop l : iloops) {
if (l.worker.task != null) {
l.worker.task.get(secs, TimeUnit.SECONDS);
latencyTotal += l.latencyTotal;
latencyCount += l.latencyCount;
}
}
messages.freeze();
bytes.freeze();
}
/**
* Reports on the accumulated throughput and latency.
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength)),
f.format(latencyTotal / latencyCount)));
}
// Loop for transfer of fixed-length messages
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
Worker worker = new Worker();
long latencyTotal = 0;
long latencyCount = 0;
public CustomIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
return new TestMessageStream(msgLength, channel, this);
}
@Override
protected synchronized void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
stream.messagesOut().reset();
stream.bytesOut().reset();
}
@Override
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> stream) {
for (TestMessage message : messages) {
// TODO: summarize latency data better
latencyTotal += nanoTime() - message.requestorTime();
latencyCount++;
}
worker.release(messages.size());
}
@Override
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
w.pump(b);
}
}
/**
* Auxiliary worker to connect and pump batched messages using blocking I/O.
*/
private class Worker implements Runnable {
private static final int BATCH_SIZE = 50;
private static final int PERMITS = 2 * BATCH_SIZE;
private TestMessageStream stream;
private FutureTask<Worker> task;
// Stuff to throttle pump
private final Semaphore semaphore = new Semaphore(PERMITS);
private int msgWritten;
void pump(TestMessageStream stream) {
this.stream = stream;
task = new FutureTask<>(this, this);
wpool.execute(task);
}
@Override
public void run() {
try {
log.info("Worker started...");
while (msgWritten < msgCount) {
int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
writeBatch(size);
msgWritten += size;
}
// Now try to get all the permits back before sending poison pill
semaphore.acquireUninterruptibly(PERMITS);
stream.close();
log.info("Worker done...");
} catch (IOException e) {
log.error("Worker unable to perform I/O", e);
}
}
private void writeBatch(int size) throws IOException {
// Build a batch of messages
List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
}
acquire(size);
stream.write(batch);
}
// Release permits based on the specified number of message credits
private void release(int permits) {
semaphore.release(permits);
}
// Acquire permit for a single batch
private void acquire(int permits) {
semaphore.acquireUninterruptibly(permits);
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import com.google.common.collect.Lists;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.String.format;
import static java.lang.System.out;
import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
public class IOLoopTestServer {
private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
private static final int PRUNE_FREQUENCY = 1000;
static final int PORT = 9876;
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
private final AcceptorLoop aloop;
private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
private final List<CustomIOLoop> iloops = new ArrayList<>();
private final ExecutorService ipool;
private final int workerCount;
private final int msgLength;
private int lastWorker = -1;
Counter messages;
Counter bytes;
/**
* Main entry point to launch the server.
*
* @param args command-line arguments
* @throws java.io.IOException if unable to crate IO loops
*/
public static void main(String[] args) throws IOException {
startStandalone(args);
System.exit(0);
}
/**
* Starts a standalone IO loop test server.
*
* @param args command-line arguments
*/
public static void startStandalone(String[] args) throws IOException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
log.info("Setting up the server with {} workers, {} byte messages on {}... ",
wc, ml, ip);
IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
server.start();
// Start pruning clients and keep going until their number goes to 0.
int remaining = -1;
while (remaining == -1 || remaining > 0) {
delay(PRUNE_FREQUENCY);
int r = server.prune();
remaining = remaining == -1 && r == 0 ? remaining : r;
}
server.stop();
}
/**
* Creates a speed server.
*
* @param ip optional ip of the adapter where to bind
* @param wc worker count
* @param ml message length in bytes
* @param port listen port
* @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
this.msgLength = ml;
this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
for (int i = 0; i < workerCount; i++) {
iloops.add(new CustomIOLoop());
}
}
/**
* Start the server IO loops and kicks off throughput tracking.
*/
public void start() {
messages = new Counter();
bytes = new Counter();
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
apool.execute(aloop);
for (CustomIOLoop l : iloops) {
l.awaitStart(TIMEOUT);
}
aloop.awaitStart(TIMEOUT);
}
/**
* Stop the server IO loops and freezes throughput tracking.
*/
public void stop() {
aloop.shutdown();
for (CustomIOLoop l : iloops) {
l.shutdown();
}
for (CustomIOLoop l : iloops) {
l.awaitStop(TIMEOUT);
}
aloop.awaitStop(TIMEOUT);
messages.freeze();
bytes.freeze();
}
/**
* Reports on the accumulated throughput and latency.
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
f.format(bytes.throughput() / (1024 * msgLength))));
}
/**
* Prunes the IO loops of stale message buffers.
*
* @return number of remaining IO loops among all workers.
*/
public int prune() {
int count = 0;
for (CustomIOLoop l : iloops) {
count += l.pruneStaleStreams();
}
return count;
}
// Get the next worker to which a client should be assigned
private synchronized CustomIOLoop nextWorker() {
lastWorker = (lastWorker + 1) % workerCount;
return iloops.get(lastWorker);
}
// Loop for transfer of fixed-length messages
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
public CustomIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
return new TestMessageStream(msgLength, channel, this);
}
@Override
protected void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
}
@Override
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> stream) {
try {
stream.write(createResponses(messages));
} catch (IOException e) {
log.error("Unable to echo messages", e);
}
}
private List<TestMessage> createResponses(List<TestMessage> messages) {
List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
for (TestMessage message : messages) {
responses.add(new TestMessage(message.length(), message.requestorTime(),
System.nanoTime(), message.padding()));
}
return responses;
}
}
// Loop for accepting client connections
private class CustomAcceptLoop extends AcceptorLoop {
public CustomAcceptLoop(SocketAddress address) throws IOException {
super(500, address);
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
nextWorker().acceptStream(sc);
log.info("Connected client");
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Tests of the message message stream implementation.
*/
public class MessageStreamTest {
private static final int SIZE = 64;
private static final int BIG_SIZE = 32 * 1024;
private TestMessage message;
private TestIOLoop loop;
private TestByteChannel channel;
private TestMessageStream stream;
private TestKey key;
@Before
public void setUp() throws IOException {
loop = new TestIOLoop();
channel = new TestByteChannel();
key = new TestKey(channel);
stream = loop.createStream(channel);
stream.setKey(key);
stream.setNonStrict();
message = new TestMessage(SIZE, 0, 0, stream.padding());
}
@After
public void tearDown() {
loop.shutdown();
stream.close();
}
// Validates the state of the message stream
private void validate(boolean wp, boolean fr, int read, int written) {
assertEquals(wp, stream.isWritePending());
assertEquals(fr, stream.isFlushRequired());
assertEquals(read, channel.readBytes);
assertEquals(written, channel.writtenBytes);
}
@Test
public void endOfStream() throws IOException {
channel.close();
List<TestMessage> messages = stream.read();
assertNull(messages);
}
@Test
public void bufferGrowth() throws IOException {
// Create a stream for big messages and test the growth.
stream = new TestMessageStream(BIG_SIZE, channel, loop);
TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
stream.write(bigMessage);
stream.write(bigMessage);
stream.write(bigMessage);
stream.write(bigMessage);
stream.write(bigMessage);
}
@Test
public void discardBeforeKey() {
// Create a stream that does not yet have the key set and discard it.
stream = loop.createStream(channel);
assertNull(stream.key());
stream.close();
// There is not key, so nothing to check; we just expect no problem.
}
@Test
public void bufferedRead() throws IOException {
channel.bytesToRead = SIZE + 4;
List<TestMessage> messages = stream.read();
assertEquals(1, messages.size());
validate(false, false, SIZE + 4, 0);
channel.bytesToRead = SIZE - 4;
messages = stream.read();
assertEquals(1, messages.size());
validate(false, false, SIZE * 2, 0);
}
@Test
public void bufferedWrite() throws IOException {
validate(false, false, 0, 0);
// First write is immediate...
stream.write(message);
validate(false, false, 0, SIZE);
// Second and third get buffered...
stream.write(message);
validate(false, true, 0, SIZE);
stream.write(message);
validate(false, true, 0, SIZE);
// Reset write, which will flush if needed; the next write is again buffered
stream.flushIfWriteNotPending();
validate(false, false, 0, SIZE * 3);
stream.write(message);
validate(false, true, 0, SIZE * 3);
// Select reset, which will flush if needed; the next write is again buffered
stream.flushIfPossible();
validate(false, false, 0, SIZE * 4);
stream.write(message);
validate(false, true, 0, SIZE * 4);
stream.flush();
validate(false, true, 0, SIZE * 4);
}
@Test
public void bufferedWriteList() throws IOException {
validate(false, false, 0, 0);
// First write is immediate...
List<TestMessage> messages = new ArrayList<>();
messages.add(message);
messages.add(message);
messages.add(message);
messages.add(message);
stream.write(messages);
validate(false, false, 0, SIZE * 4);
stream.write(messages);
validate(false, true, 0, SIZE * 4);
stream.flushIfPossible();
validate(false, false, 0, SIZE * 8);
}
@Test
public void bufferedPartialWrite() throws IOException {
validate(false, false, 0, 0);
// First write is immediate...
stream.write(message);
validate(false, false, 0, SIZE);
// Tell test channel to accept only half.
channel.bytesToWrite = SIZE / 2;
// Second and third get buffered...
stream.write(message);
validate(false, true, 0, SIZE);
stream.flushIfPossible();
validate(true, true, 0, SIZE + SIZE / 2);
}
@Test
public void bufferedPartialWrite2() throws IOException {
validate(false, false, 0, 0);
// First write is immediate...
stream.write(message);
validate(false, false, 0, SIZE);
// Tell test channel to accept only half.
channel.bytesToWrite = SIZE / 2;
// Second and third get buffered...
stream.write(message);
validate(false, true, 0, SIZE);
stream.flushIfWriteNotPending();
validate(true, true, 0, SIZE + SIZE / 2);
}
@Test
public void bufferedReadWrite() throws IOException {
channel.bytesToRead = SIZE + 4;
List<TestMessage> messages = stream.read();
assertEquals(1, messages.size());
validate(false, false, SIZE + 4, 0);
stream.write(message);
validate(false, false, SIZE + 4, SIZE);
channel.bytesToRead = SIZE - 4;
messages = stream.read();
assertEquals(1, messages.size());
validate(false, false, SIZE * 2, SIZE);
}
// Fake IO driver loop
private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
public TestIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
return new TestMessageStream(SIZE, channel, this);
}
@Override
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> stream) {
}
}
// Byte channel test fixture
private static class TestByteChannel extends SelectableChannel implements ByteChannel {
private static final int BUFFER_LENGTH = 1024;
byte[] bytes = new byte[BUFFER_LENGTH];
int bytesToWrite = BUFFER_LENGTH;
int bytesToRead = BUFFER_LENGTH;
int writtenBytes = 0;
int readBytes = 0;
@Override
public int read(ByteBuffer dst) throws IOException {
int l = Math.min(dst.remaining(), bytesToRead);
if (bytesToRead > 0) {
readBytes += l;
dst.put(bytes, 0, l);
}
return l;
}
@Override
public int write(ByteBuffer src) throws IOException {
int l = Math.min(src.remaining(), bytesToWrite);
writtenBytes += l;
src.get(bytes, 0, l);
return l;
}
@Override
public Object blockingLock() {
return null;
}
@Override
public SelectableChannel configureBlocking(boolean arg0) throws IOException {
return null;
}
@Override
public boolean isBlocking() {
return false;
}
@Override
public boolean isRegistered() {
return false;
}
@Override
public SelectionKey keyFor(Selector arg0) {
return null;
}
@Override
public SelectorProvider provider() {
return null;
}
@Override
public SelectionKey register(Selector arg0, int arg1, Object arg2)
throws ClosedChannelException {
return null;
}
@Override
public int validOps() {
return 0;
}
@Override
protected void implCloseChannel() throws IOException {
bytesToRead = -1;
}
}
// Selection key text fixture
private static class TestKey extends SelectionKey {
private SelectableChannel channel;
public TestKey(TestByteChannel channel) {
this.channel = channel;
}
@Override
public void cancel() {
}
@Override
public SelectableChannel channel() {
return channel;
}
@Override
public int interestOps() {
return 0;
}
@Override
public SelectionKey interestOps(int ops) {
return null;
}
@Override
public boolean isValid() {
return true;
}
@Override
public int readyOps() {
return 0;
}
@Override
public Selector selector() {
return null;
}
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.nio.channels.spi.AbstractSelector;
import java.util.Set;
/**
* A selector instrumented for unit tests.
*/
public class MockSelector extends AbstractSelector {
int wakeUpCount = 0;
/**
* Creates a mock selector, specifying null as the SelectorProvider.
*/
public MockSelector() {
super(null);
}
@Override
public String toString() {
return "{MockSelector: wake=" + wakeUpCount + "}";
}
@Override
protected void implCloseSelector() throws IOException {
}
@Override
protected SelectionKey register(AbstractSelectableChannel ch, int ops,
Object att) {
return null;
}
@Override
public Set<SelectionKey> keys() {
return null;
}
@Override
public Set<SelectionKey> selectedKeys() {
return null;
}
@Override
public int selectNow() throws IOException {
return 0;
}
@Override
public int select(long timeout) throws IOException {
return 0;
}
@Override
public int select() throws IOException {
return 0;
}
@Override
public Selector wakeup() {
wakeUpCount++;
return null;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Test message for measuring rate and round-trip latency.
*/
public class TestMessage extends AbstractMessage {
private final byte[] padding;
private final long requestorTime;
private final long responderTime;
/**
* Creates a new message with the specified data.
*
* @param requestorTime requester time
* @param responderTime responder time
* @param padding message padding
*/
TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
this.length = length;
this.requestorTime = requestorTime;
this.responderTime = responderTime;
this.padding = checkNotNull(padding, "Padding cannot be null");
}
public long requestorTime() {
return requestorTime;
}
public long responderTime() {
return responderTime;
}
public byte[] padding() {
return padding;
}
}
/*
* Copyright 2014 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.nio;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
* Fixed-length message transfer buffer.
*/
public class TestMessageStream extends MessageStream<TestMessage> {
private static final String E_WRONG_LEN = "Illegal message length: ";
private static final long START_TAG = 0xfeedcafedeaddeedL;
private static final long END_TAG = 0xbeadcafedeaddeedL;
private static final int META_LENGTH = 40;
private final int length;
private boolean isStrict = true;
public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
super(loop, ch, 64 * 1024, 500);
checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
this.length = length;
}
void setNonStrict() {
isStrict = false;
}
@Override
protected TestMessage read(ByteBuffer rb) {
if (rb.remaining() < length) {
return null;
}
long startTag = rb.getLong();
if (isStrict) {
checkState(startTag == START_TAG, "Incorrect message start");
}
long size = rb.getLong();
long requestorTime = rb.getLong();
long responderTime = rb.getLong();
byte[] padding = padding();
rb.get(padding);
long endTag = rb.getLong();
if (isStrict) {
checkState(endTag == END_TAG, "Incorrect message end");
}
return new TestMessage((int) size, requestorTime, responderTime, padding);
}
@Override
protected void write(TestMessage message, ByteBuffer wb) {
if (message.length() != length) {
throw new IllegalArgumentException(E_WRONG_LEN + message.length());
}
wb.putLong(START_TAG);
wb.putLong(message.length());
wb.putLong(message.requestorTime());
wb.putLong(message.responderTime());
wb.put(message.padding(), 0, length - META_LENGTH);
wb.putLong(END_TAG);
}
public byte[] padding() {
return new byte[length - META_LENGTH];
}
}
......@@ -34,7 +34,6 @@
<modules>
<module>junit</module>
<module>misc</module>
<module>nio</module>
<module>yangutils</module>
<module>osgi</module>
<module>rest</module>
......