Madan Jampani

Fixed issue with recieving side not checking readability of channel before reading

...@@ -9,7 +9,7 @@ public class EchoHandler implements MessageHandler { ...@@ -9,7 +9,7 @@ public class EchoHandler implements MessageHandler {
9 9
10 @Override 10 @Override
11 public void handle(Message message) throws IOException { 11 public void handle(Message message) throws IOException {
12 - System.out.println("Received: " + message.payload() + ". Echoing it back to the sender."); 12 + System.out.println("Received message. Echoing it back to the sender.");
13 message.respond(message.payload()); 13 message.respond(message.payload());
14 } 14 }
15 } 15 }
......
...@@ -8,6 +8,14 @@ public class Endpoint { ...@@ -8,6 +8,14 @@ public class Endpoint {
8 private final int port; 8 private final int port;
9 private final String host; 9 private final String host;
10 10
11 + /**
12 + * Used for serialization.
13 + */
14 + private Endpoint() {
15 + port = 0;
16 + host = null;
17 + }
18 +
11 public Endpoint(String host, int port) { 19 public Endpoint(String host, int port) {
12 this.host = host; 20 this.host = host;
13 this.port = port; 21 this.port = port;
......
...@@ -35,6 +35,10 @@ public final class InternalMessage implements Message { ...@@ -35,6 +35,10 @@ public final class InternalMessage implements Message {
35 return payload; 35 return payload;
36 } 36 }
37 37
38 + protected void setMessagingService(NettyMessagingService messagingService) {
39 + this.messagingService = messagingService;
40 + }
41 +
38 @Override 42 @Override
39 public void respond(Object data) throws IOException { 43 public void respond(Object data) throws IOException {
40 Builder builder = new Builder(messagingService); 44 Builder builder = new Builder(messagingService);
......
1 package org.onlab.netty; 1 package org.onlab.netty;
2 2
3 import org.onlab.util.KryoPool; 3 import org.onlab.util.KryoPool;
4 -import org.slf4j.Logger;
5 -import org.slf4j.LoggerFactory;
6 4
5 +import java.nio.ByteBuffer;
7 import java.util.ArrayList; 6 import java.util.ArrayList;
8 import java.util.HashMap; 7 import java.util.HashMap;
9 8
...@@ -12,8 +11,6 @@ import java.util.HashMap; ...@@ -12,8 +11,6 @@ import java.util.HashMap;
12 */ 11 */
13 public class KryoSerializer implements Serializer { 12 public class KryoSerializer implements Serializer {
14 13
15 - private final Logger log = LoggerFactory.getLogger(getClass());
16 -
17 private KryoPool serializerPool; 14 private KryoPool serializerPool;
18 15
19 public KryoSerializer() { 16 public KryoSerializer() {
...@@ -28,7 +25,9 @@ public class KryoSerializer implements Serializer { ...@@ -28,7 +25,9 @@ public class KryoSerializer implements Serializer {
28 serializerPool = KryoPool.newBuilder() 25 serializerPool = KryoPool.newBuilder()
29 .register(ArrayList.class, 26 .register(ArrayList.class,
30 HashMap.class, 27 HashMap.class,
31 - ArrayList.class 28 + ArrayList.class,
29 + InternalMessage.class,
30 + Endpoint.class
32 ) 31 )
33 .build() 32 .build()
34 .populate(1); 33 .populate(1);
...@@ -36,7 +35,7 @@ public class KryoSerializer implements Serializer { ...@@ -36,7 +35,7 @@ public class KryoSerializer implements Serializer {
36 35
37 36
38 @Override 37 @Override
39 - public Object decode(byte[] data) { 38 + public <T> T decode(byte[] data) {
40 return serializerPool.deserialize(data); 39 return serializerPool.deserialize(data);
41 } 40 }
42 41
...@@ -44,4 +43,14 @@ public class KryoSerializer implements Serializer { ...@@ -44,4 +43,14 @@ public class KryoSerializer implements Serializer {
44 public byte[] encode(Object payload) { 43 public byte[] encode(Object payload) {
45 return serializerPool.serialize(payload); 44 return serializerPool.serialize(payload);
46 } 45 }
46 +
47 + @Override
48 + public <T> T deserialize(ByteBuffer buffer) {
49 + return serializerPool.deserialize(buffer);
50 + }
51 +
52 + @Override
53 + public void serialize(Object obj, ByteBuffer buffer) {
54 + serializerPool.serialize(obj, buffer);
55 + }
47 } 56 }
......
1 package org.onlab.netty; 1 package org.onlab.netty;
2 2
3 -import java.util.Arrays;
4 -import java.util.List;
5 -
6 import static com.google.common.base.Preconditions.checkState; 3 import static com.google.common.base.Preconditions.checkState;
7 -
8 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.ByteBuf;
9 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelHandlerContext;
10 -import io.netty.handler.codec.ByteToMessageDecoder; 6 +import io.netty.handler.codec.ReplayingDecoder;
7 +
8 +import java.util.Arrays;
9 +import java.util.List;
11 10
12 -/** 11 +// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
13 - * Decode bytes into a InternalMessage. 12 +public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
14 - */
15 -public class MessageDecoder extends ByteToMessageDecoder {
16 13
17 private final NettyMessagingService messagingService; 14 private final NettyMessagingService messagingService;
18 private final Serializer serializer; 15 private final Serializer serializer;
...@@ -23,36 +20,21 @@ public class MessageDecoder extends ByteToMessageDecoder { ...@@ -23,36 +20,21 @@ public class MessageDecoder extends ByteToMessageDecoder {
23 } 20 }
24 21
25 @Override 22 @Override
26 - protected void decode(ChannelHandlerContext context, ByteBuf in, 23 + protected void decode(
27 - List<Object> messages) throws Exception { 24 + ChannelHandlerContext context,
25 + ByteBuf buffer,
26 + List<Object> out) throws Exception {
28 27
29 - byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array(); 28 + byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
29 + buffer.readBytes(preamble);
30 checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble"); 30 checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
31 31
32 - // read message Id. 32 + int bodySize = buffer.readInt();
33 - long id = in.readLong(); 33 + byte[] body = new byte[bodySize];
34 - 34 + buffer.readBytes(body);
35 - // read message type; first read size and then bytes.
36 - String type = new String(in.readBytes(in.readInt()).array());
37 -
38 - // read sender host name; first read size and then bytes.
39 - String host = new String(in.readBytes(in.readInt()).array());
40 -
41 - // read sender port.
42 - int port = in.readInt();
43 -
44 - Endpoint sender = new Endpoint(host, port);
45 -
46 - // read message payload; first read size and then bytes.
47 - Object payload = serializer.decode(in.readBytes(in.readInt()).array());
48 -
49 - InternalMessage message = new InternalMessage.Builder(messagingService)
50 - .withId(id)
51 - .withSender(sender)
52 - .withType(type)
53 - .withPayload(payload)
54 - .build();
55 35
56 - messages.add(message); 36 + InternalMessage message = serializer.decode(body);
37 + message.setMessagingService(messagingService);
38 + out.add(message);
57 } 39 }
58 } 40 }
......
...@@ -19,42 +19,20 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { ...@@ -19,42 +19,20 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
19 } 19 }
20 20
21 @Override 21 @Override
22 - protected void encode(ChannelHandlerContext context, InternalMessage message, 22 + protected void encode(
23 + ChannelHandlerContext context,
24 + InternalMessage message,
23 ByteBuf out) throws Exception { 25 ByteBuf out) throws Exception {
24 26
25 // write preamble 27 // write preamble
26 out.writeBytes(PREAMBLE); 28 out.writeBytes(PREAMBLE);
27 29
28 - // write id 30 + byte[] payload = serializer.encode(message);
29 - out.writeLong(message.id());
30 31
31 - // write type length 32 + // write payload length
32 - out.writeInt(message.type().length());
33 -
34 - // write type
35 - out.writeBytes(message.type().getBytes());
36 -
37 - // write sender host name size
38 - out.writeInt(message.sender().host().length());
39 -
40 - // write sender host name.
41 - out.writeBytes(message.sender().host().getBytes());
42 -
43 - // write port
44 - out.writeInt(message.sender().port());
45 -
46 - try {
47 - serializer.encode(message.payload());
48 - } catch (Exception e) {
49 - e.printStackTrace();
50 - }
51 -
52 - byte[] payload = serializer.encode(message.payload());
53 -
54 - // write payload length.
55 out.writeInt(payload.length); 33 out.writeInt(payload.length);
56 34
57 - // write payload bytes 35 + // write payload.
58 out.writeBytes(payload); 36 out.writeBytes(payload);
59 } 37 }
60 } 38 }
......
...@@ -22,7 +22,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; ...@@ -22,7 +22,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
22 import io.netty.channel.socket.nio.NioSocketChannel; 22 import io.netty.channel.socket.nio.NioSocketChannel;
23 23
24 import org.apache.commons.lang.math.RandomUtils; 24 import org.apache.commons.lang.math.RandomUtils;
25 -import org.apache.commons.pool.KeyedObjectPool;
26 import org.apache.commons.pool.KeyedPoolableObjectFactory; 25 import org.apache.commons.pool.KeyedPoolableObjectFactory;
27 import org.apache.commons.pool.impl.GenericKeyedObjectPool; 26 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
28 import org.slf4j.Logger; 27 import org.slf4j.Logger;
...@@ -38,8 +37,8 @@ public class NettyMessagingService implements MessagingService { ...@@ -38,8 +37,8 @@ public class NettyMessagingService implements MessagingService {
38 37
39 private final Logger log = LoggerFactory.getLogger(getClass()); 38 private final Logger log = LoggerFactory.getLogger(getClass());
40 39
41 - private KeyedObjectPool<Endpoint, Channel> channels = 40 + private GenericKeyedObjectPool<Endpoint, Channel> channels;
42 - new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); 41 +
43 private final int port; 42 private final int port;
44 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); 43 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
45 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); 44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
...@@ -66,6 +65,9 @@ public class NettyMessagingService implements MessagingService { ...@@ -66,6 +65,9 @@ public class NettyMessagingService implements MessagingService {
66 } 65 }
67 66
68 public void activate() throws Exception { 67 public void activate() throws Exception {
68 + channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
69 + channels.setTestOnBorrow(true);
70 + channels.setTestOnReturn(true);
69 responseFutures = CacheBuilder.newBuilder() 71 responseFutures = CacheBuilder.newBuilder()
70 .maximumSize(100000) 72 .maximumSize(100000)
71 .weakValues() 73 .weakValues()
...@@ -95,17 +97,14 @@ public class NettyMessagingService implements MessagingService { ...@@ -95,17 +97,14 @@ public class NettyMessagingService implements MessagingService {
95 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException { 97 protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
96 Channel channel = null; 98 Channel channel = null;
97 try { 99 try {
100 + try {
98 channel = channels.borrowObject(ep); 101 channel = channels.borrowObject(ep);
99 channel.eventLoop().execute(new WriteTask(channel, message)); 102 channel.eventLoop().execute(new WriteTask(channel, message));
100 - } catch (Exception e) {
101 - throw new IOException(e);
102 } finally { 103 } finally {
103 - try {
104 channels.returnObject(ep, channel); 104 channels.returnObject(ep, channel);
105 - } catch (Exception e) {
106 - log.warn("Error returning object back to the pool", e);
107 - // ignored.
108 } 105 }
106 + } catch (Exception e) {
107 + throw new IOException(e);
109 } 108 }
110 } 109 }
111 110
...@@ -141,6 +140,8 @@ public class NettyMessagingService implements MessagingService { ...@@ -141,6 +140,8 @@ public class NettyMessagingService implements MessagingService {
141 140
142 private void startAcceptingConnections() throws InterruptedException { 141 private void startAcceptingConnections() throws InterruptedException {
143 ServerBootstrap b = new ServerBootstrap(); 142 ServerBootstrap b = new ServerBootstrap();
143 + b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
144 + b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
144 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); 145 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
145 b.group(bossGroup, workerGroup) 146 b.group(bossGroup, workerGroup)
146 .channel(NioServerSocketChannel.class) 147 .channel(NioServerSocketChannel.class)
...@@ -169,6 +170,8 @@ public class NettyMessagingService implements MessagingService { ...@@ -169,6 +170,8 @@ public class NettyMessagingService implements MessagingService {
169 public Channel makeObject(Endpoint ep) throws Exception { 170 public Channel makeObject(Endpoint ep) throws Exception {
170 Bootstrap b = new Bootstrap(); 171 Bootstrap b = new Bootstrap();
171 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); 172 b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
173 + b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
174 + b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
172 b.group(workerGroup); 175 b.group(workerGroup);
173 // TODO: Make this faster: 176 // TODO: Make this faster:
174 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 177 // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
...@@ -197,20 +200,20 @@ public class NettyMessagingService implements MessagingService { ...@@ -197,20 +200,20 @@ public class NettyMessagingService implements MessagingService {
197 @Override 200 @Override
198 protected void initChannel(SocketChannel channel) throws Exception { 201 protected void initChannel(SocketChannel channel) throws Exception {
199 channel.pipeline() 202 channel.pipeline()
200 - .addLast(new MessageEncoder(serializer)) 203 + .addLast("encoder", new MessageEncoder(serializer))
201 - .addLast(new MessageDecoder(NettyMessagingService.this, serializer)) 204 + .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
202 - .addLast(new NettyMessagingService.InboundMessageDispatcher()); 205 + .addLast("handler", new InboundMessageDispatcher());
203 } 206 }
204 } 207 }
205 208
206 private class WriteTask implements Runnable { 209 private class WriteTask implements Runnable {
207 210
208 - private final Object message; 211 + private final InternalMessage message;
209 private final Channel channel; 212 private final Channel channel;
210 213
211 - public WriteTask(Channel channel, Object message) { 214 + public WriteTask(Channel channel, InternalMessage message) {
212 - this.message = message;
213 this.channel = channel; 215 this.channel = channel;
216 + this.message = message;
214 } 217 }
215 218
216 @Override 219 @Override
...@@ -240,5 +243,11 @@ public class NettyMessagingService implements MessagingService { ...@@ -240,5 +243,11 @@ public class NettyMessagingService implements MessagingService {
240 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type); 243 MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
241 handler.handle(message); 244 handler.handle(message);
242 } 245 }
246 +
247 +
248 + @Override
249 + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
250 + context.close();
251 + }
243 } 252 }
244 } 253 }
......
1 package org.onlab.netty; 1 package org.onlab.netty;
2 2
3 +import java.nio.ByteBuffer;
4 +
3 /** 5 /**
4 * Interface for encoding/decoding message payloads. 6 * Interface for encoding/decoding message payloads.
5 */ 7 */
...@@ -11,7 +13,7 @@ public interface Serializer { ...@@ -11,7 +13,7 @@ public interface Serializer {
11 * @param data byte array. 13 * @param data byte array.
12 * @return POJO 14 * @return POJO
13 */ 15 */
14 - Object decode(byte[] data); 16 + public <T> T decode(byte[] data);
15 17
16 /** 18 /**
17 * Encodes the specified POJO into a byte array. 19 * Encodes the specified POJO into a byte array.
...@@ -19,6 +21,23 @@ public interface Serializer { ...@@ -19,6 +21,23 @@ public interface Serializer {
19 * @param data POJO to be encoded 21 * @param data POJO to be encoded
20 * @return byte array. 22 * @return byte array.
21 */ 23 */
22 - byte[] encode(Object message); 24 + public byte[] encode(Object data);
25 +
26 + /**
27 + * Serializes the specified object into bytes using one of the
28 + * pre-registered serializers.
29 + *
30 + * @param obj object to be serialized
31 + * @param buffer to write serialized bytes
32 + */
33 + public void serialize(final Object obj, ByteBuffer buffer);
23 34
35 + /**
36 + * Deserializes the specified bytes into an object using one of the
37 + * pre-registered serializers.
38 + *
39 + * @param buffer bytes to be deserialized
40 + * @return deserialized object
41 + */
42 + public <T> T deserialize(final ByteBuffer buffer);
24 } 43 }
......