tom

Merge remote-tracking branch 'origin/master'

...@@ -8,12 +8,16 @@ import java.io.IOException; ...@@ -8,12 +8,16 @@ import java.io.IOException;
8 */ 8 */
9 public final class InternalMessage implements Message { 9 public final class InternalMessage implements Message {
10 10
11 + public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
12 +
11 private long id; 13 private long id;
12 private Endpoint sender; 14 private Endpoint sender;
13 private String type; 15 private String type;
14 private Object payload; 16 private Object payload;
17 +
15 private transient NettyMessagingService messagingService; 18 private transient NettyMessagingService messagingService;
16 - public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY"; 19 + // TODO: add transient payload serializer or change payload type to
20 + // byte[], ByteBuffer, etc.
17 21
18 // Must be created using the Builder. 22 // Must be created using the Builder.
19 private InternalMessage() {} 23 private InternalMessage() {}
......
...@@ -241,8 +241,9 @@ public class NettyMessagingService implements MessagingService { ...@@ -241,8 +241,9 @@ public class NettyMessagingService implements MessagingService {
241 NettyMessagingService.this.responseFutures.getIfPresent(message.id()); 241 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
242 if (futureResponse != null) { 242 if (futureResponse != null) {
243 futureResponse.setResponse(message.payload()); 243 futureResponse.setResponse(message.payload());
244 - } 244 + } else {
245 log.warn("Received a reply. But was unable to locate the request handle"); 245 log.warn("Received a reply. But was unable to locate the request handle");
246 + }
246 } finally { 247 } finally {
247 NettyMessagingService.this.responseFutures.invalidate(message.id()); 248 NettyMessagingService.this.responseFutures.invalidate(message.id());
248 } 249 }
......
1 +package org.onlab.netty;
2 +
3 +import java.util.concurrent.TimeUnit;
4 +
5 +import org.junit.Assert;
6 +import org.junit.Test;
7 +
8 +/**
9 + * Simple ping-pong test that exercises NettyMessagingService.
10 + */
11 +public class PingPongTest {
12 +
13 + @Test
14 + public void testPingPong() throws Exception {
15 + NettyMessagingService pinger = new NettyMessagingService(8085);
16 + NettyMessagingService ponger = new NettyMessagingService(9086);
17 + try {
18 + pinger.activate();
19 + ponger.activate();
20 + pinger.setPayloadSerializer(new KryoSerializer());
21 + ponger.setPayloadSerializer(new KryoSerializer());
22 + ponger.registerHandler("echo", new EchoHandler());
23 + Response<String> response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", "hello");
24 + Assert.assertEquals("hello", response.get(10000, TimeUnit.MILLISECONDS));
25 + } finally {
26 + pinger.deactivate();
27 + ponger.deactivate();
28 + }
29 + }
30 +}