Madan Jampani
Committed by Madan Jampani

Added unit tests for NettyMessaging

Change-Id: I9f57b04488f53af9e773078409970f0fa0d958f9
1 +package org.onlab.netty;
2 +
3 +import java.util.Arrays;
4 +import java.util.concurrent.CompletableFuture;
5 +import java.util.concurrent.CountDownLatch;
6 +import java.util.concurrent.ExecutorService;
7 +import java.util.concurrent.Executors;
8 +import java.util.concurrent.atomic.AtomicBoolean;
9 +import java.util.concurrent.atomic.AtomicReference;
10 +import java.util.function.BiFunction;
11 +
12 +import org.junit.After;
13 +import org.junit.Before;
14 +import org.junit.Test;
15 +import org.onlab.packet.IpAddress;
16 +import org.onosproject.store.cluster.messaging.Endpoint;
17 +
18 +import com.google.common.util.concurrent.MoreExecutors;
19 +import com.google.common.util.concurrent.Uninterruptibles;
20 +
21 +import static org.junit.Assert.*;
22 +
23 +/**
24 + * Unit tests for NettyMessaging.
25 + */
26 +public class NettyMessagingTest {
27 +
28 + NettyMessaging netty1;
29 + NettyMessaging netty2;
30 +
31 + Endpoint ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5001);
32 + Endpoint ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5002);
33 + Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5003);
34 +
35 + @Before
36 + public void setUp() throws Exception {
37 + netty1 = new NettyMessaging();
38 + netty2 = new NettyMessaging();
39 +
40 + netty1.start(12, ep1);
41 + netty2.start(12, ep2);
42 + }
43 +
44 + @After
45 + public void tearDown() throws Exception {
46 + if (netty1 != null) {
47 + netty1.stop();
48 + }
49 +
50 + if (netty2 != null) {
51 + netty2.stop();
52 + }
53 + }
54 +
55 + @Test
56 + public void testSendAsync() {
57 + CountDownLatch latch1 = new CountDownLatch(1);
58 + CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
59 + response.whenComplete((r, e) -> {
60 + assertNull(e);
61 + latch1.countDown();
62 + });
63 + Uninterruptibles.awaitUninterruptibly(latch1);
64 +
65 + CountDownLatch latch2 = new CountDownLatch(1);
66 + response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
67 + response.whenComplete((r, e) -> {
68 + assertNotNull(e);
69 + latch2.countDown();
70 + });
71 + Uninterruptibles.awaitUninterruptibly(latch2);
72 + }
73 +
74 + @Test
75 + public void testSendAndReceive() {
76 + AtomicBoolean handlerInvoked = new AtomicBoolean(false);
77 + AtomicReference<byte[]> request = new AtomicReference<>();
78 + AtomicReference<Endpoint> sender = new AtomicReference<>();
79 +
80 + BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
81 + handlerInvoked.set(true);
82 + sender.set(ep);
83 + request.set(data);
84 + return "hello there".getBytes();
85 + };
86 + netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
87 +
88 + CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
89 + assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
90 + assertTrue(handlerInvoked.get());
91 + assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
92 + assertEquals(ep1, sender.get());
93 + }
94 +
95 + /*
96 + * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
97 + * and response completion occurs on the expected thread.
98 + */
99 + @Test
100 + public void testSendAndReceiveWithExecutor() {
101 + ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
102 + ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
103 + AtomicReference<String> handlerThreadName = new AtomicReference<>();
104 + AtomicReference<String> completionThreadName = new AtomicReference<>();
105 +
106 + BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
107 + handlerThreadName.set(Thread.currentThread().getName());
108 + return "hello there".getBytes();
109 + };
110 + netty2.registerHandler("test-subject", handler, handlerExecutor);
111 +
112 + CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
113 + "test-subject",
114 + "hello world".getBytes(),
115 + completionExecutor);
116 + response.whenComplete((r, e) -> {
117 + completionThreadName.set(Thread.currentThread().getName());
118 + });
119 +
120 + // Verify that the message was request handling and response completion happens on the correct thread.
121 + assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
122 + assertEquals("completion-thread", completionThreadName.get());
123 + assertEquals("handler-thread", handlerThreadName.get());
124 + }
125 +}