Showing
16 changed files
with
834 additions
and
0 deletions
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.util.concurrent.TimeUnit; | ||
4 | +import java.util.concurrent.TimeoutException; | ||
5 | + | ||
6 | +/** | ||
7 | + * An asynchronous response. | ||
8 | + * This class provides a base implementation of Response, with methods to retrieve the | ||
9 | + * result and query to see if the result is ready. The result can only be retrieved when | ||
10 | + * it is ready and the get methods will block if the result is not ready yet. | ||
11 | + * @param <T> type of response. | ||
12 | + */ | ||
13 | +public class AsyncResponse<T> implements Response<T> { | ||
14 | + | ||
15 | + private T value; | ||
16 | + private boolean done = false; | ||
17 | + private final long start = System.nanoTime(); | ||
18 | + | ||
19 | + @Override | ||
20 | + public T get(long timeout, TimeUnit tu) throws TimeoutException { | ||
21 | + timeout = tu.toNanos(timeout); | ||
22 | + boolean interrupted = false; | ||
23 | + try { | ||
24 | + synchronized (this) { | ||
25 | + while (!done) { | ||
26 | + try { | ||
27 | + long timeRemaining = timeout - (System.nanoTime() - start); | ||
28 | + if (timeRemaining <= 0) { | ||
29 | + throw new TimeoutException("Operation timed out."); | ||
30 | + } | ||
31 | + TimeUnit.NANOSECONDS.timedWait(this, timeRemaining); | ||
32 | + } catch (InterruptedException e) { | ||
33 | + interrupted = true; | ||
34 | + } | ||
35 | + } | ||
36 | + } | ||
37 | + } finally { | ||
38 | + if (interrupted) { | ||
39 | + Thread.currentThread().interrupt(); | ||
40 | + } | ||
41 | + } | ||
42 | + return value; | ||
43 | + } | ||
44 | + | ||
45 | + @Override | ||
46 | + public T get() throws InterruptedException { | ||
47 | + throw new UnsupportedOperationException(); | ||
48 | + } | ||
49 | + | ||
50 | + @Override | ||
51 | + public boolean isReady() { | ||
52 | + return done; | ||
53 | + } | ||
54 | + | ||
55 | + /** | ||
56 | + * Sets response value and unblocks any thread blocking on the response to become | ||
57 | + * available. | ||
58 | + * @param data response data. | ||
59 | + */ | ||
60 | + @SuppressWarnings("unchecked") | ||
61 | + public synchronized void setResponse(Object data) { | ||
62 | + if (!done) { | ||
63 | + done = true; | ||
64 | + value = (T) data; | ||
65 | + this.notifyAll(); | ||
66 | + } | ||
67 | + } | ||
68 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.io.IOException; | ||
4 | + | ||
5 | +/** | ||
6 | + * Message handler that echos the message back to the sender. | ||
7 | + */ | ||
8 | +public class EchoHandler implements MessageHandler { | ||
9 | + | ||
10 | + @Override | ||
11 | + public void handle(Message message) throws IOException { | ||
12 | + System.out.println("Received: " + message.payload() + ". Echoing it back to the sender."); | ||
13 | + message.respond(message.payload()); | ||
14 | + } | ||
15 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +/** | ||
4 | + * Representation of a TCP/UDP communication end point. | ||
5 | + */ | ||
6 | +public class Endpoint { | ||
7 | + | ||
8 | + private final int port; | ||
9 | + private final String host; | ||
10 | + | ||
11 | + public Endpoint(String host, int port) { | ||
12 | + this.host = host; | ||
13 | + this.port = port; | ||
14 | + } | ||
15 | + | ||
16 | + public String host() { | ||
17 | + return host; | ||
18 | + } | ||
19 | + | ||
20 | + public int port() { | ||
21 | + return port; | ||
22 | + } | ||
23 | + | ||
24 | + @Override | ||
25 | + public String toString() { | ||
26 | + return "Endpoint [port=" + port + ", host=" + host + "]"; | ||
27 | + } | ||
28 | + | ||
29 | + @Override | ||
30 | + public int hashCode() { | ||
31 | + final int prime = 31; | ||
32 | + int result = 1; | ||
33 | + result = prime * result + ((host == null) ? 0 : host.hashCode()); | ||
34 | + result = prime * result + port; | ||
35 | + return result; | ||
36 | + } | ||
37 | + | ||
38 | + @Override | ||
39 | + public boolean equals(Object obj) { | ||
40 | + if (this == obj) { | ||
41 | + return true; | ||
42 | + } | ||
43 | + if (obj == null) { | ||
44 | + return false; | ||
45 | + } | ||
46 | + if (getClass() != obj.getClass()) { | ||
47 | + return false; | ||
48 | + } | ||
49 | + Endpoint other = (Endpoint) obj; | ||
50 | + if (host == null) { | ||
51 | + if (other.host != null) { | ||
52 | + return false; | ||
53 | + } | ||
54 | + } else if (!host.equals(other.host)) { | ||
55 | + return false; | ||
56 | + } | ||
57 | + if (port != other.port) { | ||
58 | + return false; | ||
59 | + } | ||
60 | + return true; | ||
61 | + } | ||
62 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.io.IOException; | ||
4 | + | ||
5 | +/** | ||
6 | + * Internal message representation with additional attributes | ||
7 | + * for supporting, synchronous request/reply behavior. | ||
8 | + */ | ||
9 | +public final class InternalMessage implements Message { | ||
10 | + | ||
11 | + private long id; | ||
12 | + private Endpoint sender; | ||
13 | + private String type; | ||
14 | + private Object payload; | ||
15 | + private transient NettyMessagingService messagingService; | ||
16 | + public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY"; | ||
17 | + | ||
18 | + // Must be created using the Builder. | ||
19 | + private InternalMessage() {} | ||
20 | + | ||
21 | + public long id() { | ||
22 | + return id; | ||
23 | + } | ||
24 | + | ||
25 | + public String type() { | ||
26 | + return type; | ||
27 | + } | ||
28 | + | ||
29 | + public Endpoint sender() { | ||
30 | + return sender; | ||
31 | + } | ||
32 | + | ||
33 | + @Override | ||
34 | + public Object payload() { | ||
35 | + return payload; | ||
36 | + } | ||
37 | + | ||
38 | + @Override | ||
39 | + public void respond(Object data) throws IOException { | ||
40 | + Builder builder = new Builder(messagingService); | ||
41 | + InternalMessage message = builder.withId(this.id) | ||
42 | + // FIXME: Sender should be messagingService.localEp. | ||
43 | + .withSender(this.sender) | ||
44 | + .withPayload(data) | ||
45 | + .withType(REPLY_MESSAGE_TYPE) | ||
46 | + .build(); | ||
47 | + messagingService.sendAsync(sender, message); | ||
48 | + } | ||
49 | + | ||
50 | + | ||
51 | + /** | ||
52 | + * Builder for InternalMessages. | ||
53 | + */ | ||
54 | + public static class Builder { | ||
55 | + private InternalMessage message; | ||
56 | + | ||
57 | + public Builder(NettyMessagingService messagingService) { | ||
58 | + message = new InternalMessage(); | ||
59 | + message.messagingService = messagingService; | ||
60 | + } | ||
61 | + | ||
62 | + public Builder withId(long id) { | ||
63 | + message.id = id; | ||
64 | + return this; | ||
65 | + } | ||
66 | + | ||
67 | + public Builder withType(String type) { | ||
68 | + message.type = type; | ||
69 | + return this; | ||
70 | + } | ||
71 | + | ||
72 | + public Builder withSender(Endpoint sender) { | ||
73 | + message.sender = sender; | ||
74 | + return this; | ||
75 | + } | ||
76 | + public Builder withPayload(Object payload) { | ||
77 | + message.payload = payload; | ||
78 | + return this; | ||
79 | + } | ||
80 | + | ||
81 | + public InternalMessage build() { | ||
82 | + return message; | ||
83 | + } | ||
84 | + } | ||
85 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import org.onlab.util.KryoPool; | ||
4 | +import org.slf4j.Logger; | ||
5 | +import org.slf4j.LoggerFactory; | ||
6 | + | ||
7 | +import java.util.ArrayList; | ||
8 | +import java.util.HashMap; | ||
9 | + | ||
10 | +/** | ||
11 | + * Kryo Serializer. | ||
12 | + */ | ||
13 | +public class KryoSerializer implements Serializer { | ||
14 | + | ||
15 | + private final Logger log = LoggerFactory.getLogger(getClass()); | ||
16 | + | ||
17 | + private KryoPool serializerPool; | ||
18 | + | ||
19 | + public KryoSerializer() { | ||
20 | + setupKryoPool(); | ||
21 | + } | ||
22 | + | ||
23 | + /** | ||
24 | + * Sets up the common serialzers pool. | ||
25 | + */ | ||
26 | + protected void setupKryoPool() { | ||
27 | + // FIXME Slice out types used in common to separate pool/namespace. | ||
28 | + serializerPool = KryoPool.newBuilder() | ||
29 | + .register(ArrayList.class, | ||
30 | + HashMap.class, | ||
31 | + ArrayList.class | ||
32 | + ) | ||
33 | + .build() | ||
34 | + .populate(1); | ||
35 | + } | ||
36 | + | ||
37 | + | ||
38 | + @Override | ||
39 | + public Object decode(byte[] data) { | ||
40 | + return serializerPool.deserialize(data); | ||
41 | + } | ||
42 | + | ||
43 | + @Override | ||
44 | + public byte[] encode(Object payload) { | ||
45 | + return serializerPool.serialize(payload); | ||
46 | + } | ||
47 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.io.IOException; | ||
4 | + | ||
5 | +/** | ||
6 | + * A unit of communication. | ||
7 | + * Has a payload. Also supports a feature to respond back to the sender. | ||
8 | + */ | ||
9 | +public interface Message { | ||
10 | + | ||
11 | + /** | ||
12 | + * Returns the payload of this message. | ||
13 | + * @return message payload. | ||
14 | + */ | ||
15 | + public Object payload(); | ||
16 | + | ||
17 | + /** | ||
18 | + * Sends a reply back to the sender of this messge. | ||
19 | + * @param data payload of the response. | ||
20 | + * @throws IOException if there is a communication error. | ||
21 | + */ | ||
22 | + public void respond(Object data) throws IOException; | ||
23 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.util.Arrays; | ||
4 | +import java.util.List; | ||
5 | + | ||
6 | +import static com.google.common.base.Preconditions.checkState; | ||
7 | + | ||
8 | +import io.netty.buffer.ByteBuf; | ||
9 | +import io.netty.channel.ChannelHandlerContext; | ||
10 | +import io.netty.handler.codec.ByteToMessageDecoder; | ||
11 | + | ||
12 | +/** | ||
13 | + * Decode bytes into a InternalMessage. | ||
14 | + */ | ||
15 | +public class MessageDecoder extends ByteToMessageDecoder { | ||
16 | + | ||
17 | + private final NettyMessagingService messagingService; | ||
18 | + private final Serializer serializer; | ||
19 | + | ||
20 | + public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) { | ||
21 | + this.messagingService = messagingService; | ||
22 | + this.serializer = serializer; | ||
23 | + } | ||
24 | + | ||
25 | + @Override | ||
26 | + protected void decode(ChannelHandlerContext context, ByteBuf in, | ||
27 | + List<Object> messages) throws Exception { | ||
28 | + | ||
29 | + byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array(); | ||
30 | + checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble"); | ||
31 | + | ||
32 | + // read message Id. | ||
33 | + long id = in.readLong(); | ||
34 | + | ||
35 | + // read message type; first read size and then bytes. | ||
36 | + String type = new String(in.readBytes(in.readInt()).array()); | ||
37 | + | ||
38 | + // read sender host name; first read size and then bytes. | ||
39 | + String host = new String(in.readBytes(in.readInt()).array()); | ||
40 | + | ||
41 | + // read sender port. | ||
42 | + int port = in.readInt(); | ||
43 | + | ||
44 | + Endpoint sender = new Endpoint(host, port); | ||
45 | + | ||
46 | + // read message payload; first read size and then bytes. | ||
47 | + Object payload = serializer.decode(in.readBytes(in.readInt()).array()); | ||
48 | + | ||
49 | + InternalMessage message = new InternalMessage.Builder(messagingService) | ||
50 | + .withId(id) | ||
51 | + .withSender(sender) | ||
52 | + .withType(type) | ||
53 | + .withPayload(payload) | ||
54 | + .build(); | ||
55 | + | ||
56 | + messages.add(message); | ||
57 | + } | ||
58 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import io.netty.buffer.ByteBuf; | ||
4 | +import io.netty.channel.ChannelHandlerContext; | ||
5 | +import io.netty.handler.codec.MessageToByteEncoder; | ||
6 | + | ||
7 | +/** | ||
8 | + * Encode InternalMessage out into a byte buffer. | ||
9 | + */ | ||
10 | +public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { | ||
11 | + | ||
12 | + // onosiscool in ascii | ||
13 | + public static final byte[] PREAMBLE = "onosiscool".getBytes(); | ||
14 | + | ||
15 | + private final Serializer serializer; | ||
16 | + | ||
17 | + public MessageEncoder(Serializer serializer) { | ||
18 | + this.serializer = serializer; | ||
19 | + } | ||
20 | + | ||
21 | + @Override | ||
22 | + protected void encode(ChannelHandlerContext context, InternalMessage message, | ||
23 | + ByteBuf out) throws Exception { | ||
24 | + | ||
25 | + // write preamble | ||
26 | + out.writeBytes(PREAMBLE); | ||
27 | + | ||
28 | + // write id | ||
29 | + out.writeLong(message.id()); | ||
30 | + | ||
31 | + // write type length | ||
32 | + out.writeInt(message.type().length()); | ||
33 | + | ||
34 | + // write type | ||
35 | + out.writeBytes(message.type().getBytes()); | ||
36 | + | ||
37 | + // write sender host name size | ||
38 | + out.writeInt(message.sender().host().length()); | ||
39 | + | ||
40 | + // write sender host name. | ||
41 | + out.writeBytes(message.sender().host().getBytes()); | ||
42 | + | ||
43 | + // write port | ||
44 | + out.writeInt(message.sender().port()); | ||
45 | + | ||
46 | + try { | ||
47 | + serializer.encode(message.payload()); | ||
48 | + } catch (Exception e) { | ||
49 | + e.printStackTrace(); | ||
50 | + } | ||
51 | + | ||
52 | + byte[] payload = serializer.encode(message.payload()); | ||
53 | + | ||
54 | + // write payload length. | ||
55 | + out.writeInt(payload.length); | ||
56 | + | ||
57 | + // write payload bytes | ||
58 | + out.writeBytes(payload); | ||
59 | + } | ||
60 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.io.IOException; | ||
4 | + | ||
5 | +/** | ||
6 | + * Handler for a message. | ||
7 | + */ | ||
8 | +public interface MessageHandler { | ||
9 | + | ||
10 | + /** | ||
11 | + * Handles the message. | ||
12 | + * @param message message. | ||
13 | + * @throws IOException. | ||
14 | + */ | ||
15 | + public void handle(Message message) throws IOException; | ||
16 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.io.IOException; | ||
4 | + | ||
5 | +/** | ||
6 | + * Interface for low level messaging primitives. | ||
7 | + */ | ||
8 | +public interface MessagingService { | ||
9 | + /** | ||
10 | + * Sends a message asynchronously to the specified communication end point. | ||
11 | + * The message is specified using the type and payload. | ||
12 | + * @param ep end point to send the message to. | ||
13 | + * @param type type of message. | ||
14 | + * @param payload message payload. | ||
15 | + * @throws IOException | ||
16 | + */ | ||
17 | + public void sendAsync(Endpoint ep, String type, Object payload) throws IOException; | ||
18 | + | ||
19 | + /** | ||
20 | + * Sends a message synchronously and waits for a response. | ||
21 | + * @param ep end point to send the message to. | ||
22 | + * @param type type of message. | ||
23 | + * @param payload message payload. | ||
24 | + * @return a response future | ||
25 | + * @throws IOException | ||
26 | + */ | ||
27 | + public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException; | ||
28 | + | ||
29 | + /** | ||
30 | + * Registers a new message handler for message type. | ||
31 | + * @param type message type. | ||
32 | + * @param handler message handler | ||
33 | + */ | ||
34 | + public void registerHandler(String type, MessageHandler handler); | ||
35 | + | ||
36 | + /** | ||
37 | + * Unregister current handler, if one exists for message type. | ||
38 | + * @param type message type | ||
39 | + */ | ||
40 | + public void unregisterHandler(String type); | ||
41 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.io.IOException; | ||
4 | +import java.net.UnknownHostException; | ||
5 | +import java.util.concurrent.ConcurrentHashMap; | ||
6 | +import java.util.concurrent.ConcurrentMap; | ||
7 | +import java.util.concurrent.TimeUnit; | ||
8 | + | ||
9 | +import io.netty.bootstrap.Bootstrap; | ||
10 | +import io.netty.bootstrap.ServerBootstrap; | ||
11 | +import io.netty.buffer.PooledByteBufAllocator; | ||
12 | +import io.netty.channel.Channel; | ||
13 | +import io.netty.channel.ChannelFuture; | ||
14 | +import io.netty.channel.ChannelHandlerContext; | ||
15 | +import io.netty.channel.ChannelInitializer; | ||
16 | +import io.netty.channel.ChannelOption; | ||
17 | +import io.netty.channel.EventLoopGroup; | ||
18 | +import io.netty.channel.SimpleChannelInboundHandler; | ||
19 | +import io.netty.channel.nio.NioEventLoopGroup; | ||
20 | +import io.netty.channel.socket.SocketChannel; | ||
21 | +import io.netty.channel.socket.nio.NioServerSocketChannel; | ||
22 | +import io.netty.channel.socket.nio.NioSocketChannel; | ||
23 | + | ||
24 | +import org.apache.commons.lang.math.RandomUtils; | ||
25 | +import org.apache.commons.pool.KeyedObjectPool; | ||
26 | +import org.apache.commons.pool.KeyedPoolableObjectFactory; | ||
27 | +import org.apache.commons.pool.impl.GenericKeyedObjectPool; | ||
28 | +import org.slf4j.Logger; | ||
29 | +import org.slf4j.LoggerFactory; | ||
30 | + | ||
31 | +import com.google.common.cache.Cache; | ||
32 | +import com.google.common.cache.CacheBuilder; | ||
33 | + | ||
34 | +/** | ||
35 | + * A Netty based implementation of MessagingService. | ||
36 | + */ | ||
37 | +public class NettyMessagingService implements MessagingService { | ||
38 | + | ||
39 | + private final Logger log = LoggerFactory.getLogger(getClass()); | ||
40 | + | ||
41 | + private KeyedObjectPool<Endpoint, Channel> channels = | ||
42 | + new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); | ||
43 | + private final int port; | ||
44 | + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); | ||
45 | + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); | ||
46 | + private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); | ||
47 | + private Cache<Long, AsyncResponse<?>> responseFutures; | ||
48 | + private final Endpoint localEp; | ||
49 | + | ||
50 | + protected Serializer serializer; | ||
51 | + | ||
52 | + public NettyMessagingService() { | ||
53 | + // TODO: Default port should be configurable. | ||
54 | + this(8080); | ||
55 | + } | ||
56 | + | ||
57 | + // FIXME: Constructor should not throw exceptions. | ||
58 | + public NettyMessagingService(int port) { | ||
59 | + this.port = port; | ||
60 | + try { | ||
61 | + localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port); | ||
62 | + } catch (UnknownHostException e) { | ||
63 | + // bailing out. | ||
64 | + throw new RuntimeException(e); | ||
65 | + } | ||
66 | + } | ||
67 | + | ||
68 | + public void activate() throws Exception { | ||
69 | + responseFutures = CacheBuilder.newBuilder() | ||
70 | + .maximumSize(100000) | ||
71 | + .weakValues() | ||
72 | + // TODO: Once the entry expires, notify blocking threads (if any). | ||
73 | + .expireAfterWrite(10, TimeUnit.MINUTES) | ||
74 | + .build(); | ||
75 | + startAcceptingConnections(); | ||
76 | + } | ||
77 | + | ||
78 | + public void deactivate() throws Exception { | ||
79 | + channels.close(); | ||
80 | + bossGroup.shutdownGracefully(); | ||
81 | + workerGroup.shutdownGracefully(); | ||
82 | + } | ||
83 | + | ||
84 | + @Override | ||
85 | + public void sendAsync(Endpoint ep, String type, Object payload) throws IOException { | ||
86 | + InternalMessage message = new InternalMessage.Builder(this) | ||
87 | + .withId(RandomUtils.nextLong()) | ||
88 | + .withSender(localEp) | ||
89 | + .withType(type) | ||
90 | + .withPayload(payload) | ||
91 | + .build(); | ||
92 | + sendAsync(ep, message); | ||
93 | + } | ||
94 | + | ||
95 | + protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException { | ||
96 | + Channel channel = null; | ||
97 | + try { | ||
98 | + channel = channels.borrowObject(ep); | ||
99 | + channel.eventLoop().execute(new WriteTask(channel, message)); | ||
100 | + } catch (Exception e) { | ||
101 | + throw new IOException(e); | ||
102 | + } finally { | ||
103 | + try { | ||
104 | + channels.returnObject(ep, channel); | ||
105 | + } catch (Exception e) { | ||
106 | + log.warn("Error returning object back to the pool", e); | ||
107 | + // ignored. | ||
108 | + } | ||
109 | + } | ||
110 | + } | ||
111 | + | ||
112 | + @Override | ||
113 | + public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) | ||
114 | + throws IOException { | ||
115 | + AsyncResponse<T> futureResponse = new AsyncResponse<T>(); | ||
116 | + Long messageId = RandomUtils.nextLong(); | ||
117 | + responseFutures.put(messageId, futureResponse); | ||
118 | + InternalMessage message = new InternalMessage.Builder(this) | ||
119 | + .withId(messageId) | ||
120 | + .withSender(localEp) | ||
121 | + .withType(type) | ||
122 | + .withPayload(payload) | ||
123 | + .build(); | ||
124 | + sendAsync(ep, message); | ||
125 | + return futureResponse; | ||
126 | + } | ||
127 | + | ||
128 | + @Override | ||
129 | + public void registerHandler(String type, MessageHandler handler) { | ||
130 | + // TODO: Is this the right semantics for handler registration? | ||
131 | + handlers.putIfAbsent(type, handler); | ||
132 | + } | ||
133 | + | ||
134 | + public void unregisterHandler(String type) { | ||
135 | + handlers.remove(type); | ||
136 | + } | ||
137 | + | ||
138 | + private MessageHandler getMessageHandler(String type) { | ||
139 | + return handlers.get(type); | ||
140 | + } | ||
141 | + | ||
142 | + private void startAcceptingConnections() throws InterruptedException { | ||
143 | + ServerBootstrap b = new ServerBootstrap(); | ||
144 | + b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); | ||
145 | + b.group(bossGroup, workerGroup) | ||
146 | + .channel(NioServerSocketChannel.class) | ||
147 | + .childHandler(new OnosCommunicationChannelInitializer()) | ||
148 | + .option(ChannelOption.SO_BACKLOG, 128) | ||
149 | + .childOption(ChannelOption.SO_KEEPALIVE, true); | ||
150 | + | ||
151 | + // Bind and start to accept incoming connections. | ||
152 | + b.bind(port).sync(); | ||
153 | + } | ||
154 | + | ||
155 | + private class OnosCommunicationChannelFactory | ||
156 | + implements KeyedPoolableObjectFactory<Endpoint, Channel> { | ||
157 | + | ||
158 | + @Override | ||
159 | + public void activateObject(Endpoint endpoint, Channel channel) | ||
160 | + throws Exception { | ||
161 | + } | ||
162 | + | ||
163 | + @Override | ||
164 | + public void destroyObject(Endpoint ep, Channel channel) throws Exception { | ||
165 | + channel.close(); | ||
166 | + } | ||
167 | + | ||
168 | + @Override | ||
169 | + public Channel makeObject(Endpoint ep) throws Exception { | ||
170 | + Bootstrap b = new Bootstrap(); | ||
171 | + b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); | ||
172 | + b.group(workerGroup); | ||
173 | + // TODO: Make this faster: | ||
174 | + // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 | ||
175 | + b.channel(NioSocketChannel.class); | ||
176 | + b.option(ChannelOption.SO_KEEPALIVE, true); | ||
177 | + b.handler(new OnosCommunicationChannelInitializer()); | ||
178 | + | ||
179 | + // Start the client. | ||
180 | + ChannelFuture f = b.connect(ep.host(), ep.port()).sync(); | ||
181 | + return f.channel(); | ||
182 | + } | ||
183 | + | ||
184 | + @Override | ||
185 | + public void passivateObject(Endpoint ep, Channel channel) | ||
186 | + throws Exception { | ||
187 | + } | ||
188 | + | ||
189 | + @Override | ||
190 | + public boolean validateObject(Endpoint ep, Channel channel) { | ||
191 | + return channel.isOpen(); | ||
192 | + } | ||
193 | + } | ||
194 | + | ||
195 | + private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { | ||
196 | + | ||
197 | + @Override | ||
198 | + protected void initChannel(SocketChannel channel) throws Exception { | ||
199 | + channel.pipeline() | ||
200 | + .addLast(new MessageEncoder(serializer)) | ||
201 | + .addLast(new MessageDecoder(NettyMessagingService.this, serializer)) | ||
202 | + .addLast(new NettyMessagingService.InboundMessageDispatcher()); | ||
203 | + } | ||
204 | + } | ||
205 | + | ||
206 | + private class WriteTask implements Runnable { | ||
207 | + | ||
208 | + private final Object message; | ||
209 | + private final Channel channel; | ||
210 | + | ||
211 | + public WriteTask(Channel channel, Object message) { | ||
212 | + this.message = message; | ||
213 | + this.channel = channel; | ||
214 | + } | ||
215 | + | ||
216 | + @Override | ||
217 | + public void run() { | ||
218 | + channel.writeAndFlush(message); | ||
219 | + } | ||
220 | + } | ||
221 | + | ||
222 | + private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> { | ||
223 | + | ||
224 | + @Override | ||
225 | + protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception { | ||
226 | + String type = message.type(); | ||
227 | + if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { | ||
228 | + try { | ||
229 | + AsyncResponse<?> futureResponse = | ||
230 | + NettyMessagingService.this.responseFutures.getIfPresent(message.id()); | ||
231 | + if (futureResponse != null) { | ||
232 | + futureResponse.setResponse(message.payload()); | ||
233 | + } | ||
234 | + log.warn("Received a reply. But was unable to locate the request handle"); | ||
235 | + } finally { | ||
236 | + NettyMessagingService.this.responseFutures.invalidate(message.id()); | ||
237 | + } | ||
238 | + return; | ||
239 | + } | ||
240 | + MessageHandler handler = NettyMessagingService.this.getMessageHandler(type); | ||
241 | + handler.handle(message); | ||
242 | + } | ||
243 | + } | ||
244 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.util.concurrent.TimeUnit; | ||
4 | +import java.util.concurrent.TimeoutException; | ||
5 | + | ||
6 | +/** | ||
7 | + * Response object returned when making synchronous requests. | ||
8 | + * Can you used to check is a response is ready and/or wait for a response | ||
9 | + * to become available. | ||
10 | + * | ||
11 | + * @param <T> type of response. | ||
12 | + */ | ||
13 | +public interface Response<T> { | ||
14 | + | ||
15 | + /** | ||
16 | + * Gets the response waiting for a designated timeout period. | ||
17 | + * @param timeout timeout period (since request was sent out) | ||
18 | + * @param tu unit of time. | ||
19 | + * @return response | ||
20 | + * @throws TimeoutException if the timeout expires before the response arrives. | ||
21 | + */ | ||
22 | + public T get(long timeout, TimeUnit tu) throws TimeoutException; | ||
23 | + | ||
24 | + /** | ||
25 | + * Gets the response waiting for indefinite timeout period. | ||
26 | + * @return response | ||
27 | + * @throws InterruptedException if the thread is interrupted before the response arrives. | ||
28 | + */ | ||
29 | + public T get() throws InterruptedException; | ||
30 | + | ||
31 | + /** | ||
32 | + * Checks if the response is ready without blocking. | ||
33 | + * @return true if response is ready, false otherwise. | ||
34 | + */ | ||
35 | + public boolean isReady(); | ||
36 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +/** | ||
4 | + * Interface for encoding/decoding message payloads. | ||
5 | + */ | ||
6 | +public interface Serializer { | ||
7 | + | ||
8 | + /** | ||
9 | + * Decodes the specified byte array to a POJO. | ||
10 | + * | ||
11 | + * @param data byte array. | ||
12 | + * @return POJO | ||
13 | + */ | ||
14 | + Object decode(byte[] data); | ||
15 | + | ||
16 | + /** | ||
17 | + * Encodes the specified POJO into a byte array. | ||
18 | + * | ||
19 | + * @param data POJO to be encoded | ||
20 | + * @return byte array. | ||
21 | + */ | ||
22 | + byte[] encode(Object message); | ||
23 | + | ||
24 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +import java.util.concurrent.TimeUnit; | ||
4 | + | ||
5 | +public final class SimpleClient { | ||
6 | + private SimpleClient() {} | ||
7 | + | ||
8 | + public static void main(String... args) throws Exception { | ||
9 | + NettyMessagingService messaging = new TestNettyMessagingService(9081); | ||
10 | + messaging.activate(); | ||
11 | + | ||
12 | + messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World"); | ||
13 | + Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World"); | ||
14 | + System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS)); | ||
15 | + } | ||
16 | + | ||
17 | + public static class TestNettyMessagingService extends NettyMessagingService { | ||
18 | + public TestNettyMessagingService(int port) throws Exception { | ||
19 | + super(port); | ||
20 | + Serializer serializer = new KryoSerializer(); | ||
21 | + this.serializer = serializer; | ||
22 | + } | ||
23 | + } | ||
24 | +} |
1 | +package org.onlab.netty; | ||
2 | + | ||
3 | +public final class SimpleServer { | ||
4 | + private SimpleServer() {} | ||
5 | + | ||
6 | + public static void main(String... args) throws Exception { | ||
7 | + NettyMessagingService server = new TestNettyMessagingService(); | ||
8 | + server.activate(); | ||
9 | + server.registerHandler("simple", new LoggingHandler()); | ||
10 | + server.registerHandler("echo", new EchoHandler()); | ||
11 | + } | ||
12 | + | ||
13 | + public static class TestNettyMessagingService extends NettyMessagingService { | ||
14 | + protected TestNettyMessagingService() { | ||
15 | + Serializer serializer = new KryoSerializer(); | ||
16 | + this.serializer = serializer; | ||
17 | + } | ||
18 | + } | ||
19 | +} |
-
Please register or login to post a comment