Madan Jampani

Improved message decoding performance

......@@ -17,8 +17,8 @@ public class AsyncResponse<T> implements Response<T> {
private final long start = System.nanoTime();
@Override
public T get(long timeout, TimeUnit tu) throws TimeoutException {
timeout = tu.toNanos(timeout);
public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
timeout = timeUnit.toNanos(timeout);
boolean interrupted = false;
try {
synchronized (this) {
......
......@@ -2,14 +2,19 @@ package org.onlab.netty;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Message handler that echos the message back to the sender.
*/
public class EchoHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) throws IOException {
System.out.println("Received message. Echoing it back to the sender.");
log.info("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
......
......@@ -11,6 +11,7 @@ public class Endpoint {
/**
* Used for serialization.
*/
@SuppressWarnings("unused")
private Endpoint() {
port = 0;
host = null;
......
package org.onlab.netty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A MessageHandler that simply logs the information.
*/
public class LoggingHandler implements MessageHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void handle(Message message) {
System.out.println("Received: " + message.payload());
log.info("Received message. Payload: " + message.payload());
}
}
......
......@@ -8,13 +8,18 @@ import io.netty.handler.codec.ReplayingDecoder;
import java.util.Arrays;
import java.util.List;
// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
/**
* Decoder for inbound messages.
*/
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
private int contentLength;
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
this.serializer = serializer;
}
......@@ -25,16 +30,31 @@ public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
ByteBuf buffer,
List<Object> out) throws Exception {
switch(state()) {
case READ_HEADER_VERSION:
int headerVersion = buffer.readInt();
checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
checkpoint(DecoderState.READ_PREAMBLE);
case READ_PREAMBLE:
byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
buffer.readBytes(preamble);
checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
int bodySize = buffer.readInt();
byte[] body = new byte[bodySize];
buffer.readBytes(body);
InternalMessage message = serializer.decode(body);
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_SERIALIZER_VERSION);
case READ_SERIALIZER_VERSION:
int serializerVersion = buffer.readInt();
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
InternalMessage message = serializer.deserialize(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
break;
default:
checkState(false, "Must not be here");
}
}
}
......
......@@ -11,6 +11,9 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
public static final int HEADER_VERSION = 1;
public static final int SERIALIZER_VERSION = 1;
private final Serializer serializer;
......@@ -24,6 +27,9 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
InternalMessage message,
ByteBuf out) throws Exception {
// write version
out.writeInt(HEADER_VERSION);
// write preamble
out.writeBytes(PREAMBLE);
......@@ -32,6 +38,9 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write payload length
out.writeInt(payload.length);
// write serializer version
out.writeInt(SERIALIZER_VERSION);
// write payload.
out.writeBytes(payload);
}
......
......@@ -134,6 +134,10 @@ public class NettyMessagingService implements MessagingService {
handlers.remove(type);
}
public void setSerializer(Serializer serializer) {
this.serializer = serializer;
}
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
......
......@@ -4,16 +4,10 @@ public final class SimpleServer {
private SimpleServer() {}
public static void main(String... args) throws Exception {
NettyMessagingService server = new TestNettyMessagingService();
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
server.setSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
public static class TestNettyMessagingService extends NettyMessagingService {
protected TestNettyMessagingService() {
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
}
}
}
......