Showing
16 changed files
with
0 additions
and
834 deletions
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.util.concurrent.TimeUnit; | ||
4 | -import java.util.concurrent.TimeoutException; | ||
5 | - | ||
6 | -/** | ||
7 | - * An asynchronous response. | ||
8 | - * This class provides a base implementation of Response, with methods to retrieve the | ||
9 | - * result and query to see if the result is ready. The result can only be retrieved when | ||
10 | - * it is ready and the get methods will block if the result is not ready yet. | ||
11 | - * @param <T> type of response. | ||
12 | - */ | ||
13 | -public class AsyncResponse<T> implements Response<T> { | ||
14 | - | ||
15 | - private T value; | ||
16 | - private boolean done = false; | ||
17 | - private final long start = System.nanoTime(); | ||
18 | - | ||
19 | - @Override | ||
20 | - public T get(long timeout, TimeUnit tu) throws TimeoutException { | ||
21 | - timeout = tu.toNanos(timeout); | ||
22 | - boolean interrupted = false; | ||
23 | - try { | ||
24 | - synchronized (this) { | ||
25 | - while (!done) { | ||
26 | - try { | ||
27 | - long timeRemaining = timeout - (System.nanoTime() - start); | ||
28 | - if (timeRemaining <= 0) { | ||
29 | - throw new TimeoutException("Operation timed out."); | ||
30 | - } | ||
31 | - TimeUnit.NANOSECONDS.timedWait(this, timeRemaining); | ||
32 | - } catch (InterruptedException e) { | ||
33 | - interrupted = true; | ||
34 | - } | ||
35 | - } | ||
36 | - } | ||
37 | - } finally { | ||
38 | - if (interrupted) { | ||
39 | - Thread.currentThread().interrupt(); | ||
40 | - } | ||
41 | - } | ||
42 | - return value; | ||
43 | - } | ||
44 | - | ||
45 | - @Override | ||
46 | - public T get() throws InterruptedException { | ||
47 | - throw new UnsupportedOperationException(); | ||
48 | - } | ||
49 | - | ||
50 | - @Override | ||
51 | - public boolean isReady() { | ||
52 | - return done; | ||
53 | - } | ||
54 | - | ||
55 | - /** | ||
56 | - * Sets response value and unblocks any thread blocking on the response to become | ||
57 | - * available. | ||
58 | - * @param data response data. | ||
59 | - */ | ||
60 | - @SuppressWarnings("unchecked") | ||
61 | - public synchronized void setResponse(Object data) { | ||
62 | - if (!done) { | ||
63 | - done = true; | ||
64 | - value = (T) data; | ||
65 | - this.notifyAll(); | ||
66 | - } | ||
67 | - } | ||
68 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.io.IOException; | ||
4 | - | ||
5 | -/** | ||
6 | - * Message handler that echos the message back to the sender. | ||
7 | - */ | ||
8 | -public class EchoHandler implements MessageHandler { | ||
9 | - | ||
10 | - @Override | ||
11 | - public void handle(Message message) throws IOException { | ||
12 | - System.out.println("Received: " + message.payload() + ". Echoing it back to the sender."); | ||
13 | - message.respond(message.payload()); | ||
14 | - } | ||
15 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -/** | ||
4 | - * Representation of a TCP/UDP communication end point. | ||
5 | - */ | ||
6 | -public class Endpoint { | ||
7 | - | ||
8 | - private final int port; | ||
9 | - private final String host; | ||
10 | - | ||
11 | - public Endpoint(String host, int port) { | ||
12 | - this.host = host; | ||
13 | - this.port = port; | ||
14 | - } | ||
15 | - | ||
16 | - public String host() { | ||
17 | - return host; | ||
18 | - } | ||
19 | - | ||
20 | - public int port() { | ||
21 | - return port; | ||
22 | - } | ||
23 | - | ||
24 | - @Override | ||
25 | - public String toString() { | ||
26 | - return "Endpoint [port=" + port + ", host=" + host + "]"; | ||
27 | - } | ||
28 | - | ||
29 | - @Override | ||
30 | - public int hashCode() { | ||
31 | - final int prime = 31; | ||
32 | - int result = 1; | ||
33 | - result = prime * result + ((host == null) ? 0 : host.hashCode()); | ||
34 | - result = prime * result + port; | ||
35 | - return result; | ||
36 | - } | ||
37 | - | ||
38 | - @Override | ||
39 | - public boolean equals(Object obj) { | ||
40 | - if (this == obj) { | ||
41 | - return true; | ||
42 | - } | ||
43 | - if (obj == null) { | ||
44 | - return false; | ||
45 | - } | ||
46 | - if (getClass() != obj.getClass()) { | ||
47 | - return false; | ||
48 | - } | ||
49 | - Endpoint other = (Endpoint) obj; | ||
50 | - if (host == null) { | ||
51 | - if (other.host != null) { | ||
52 | - return false; | ||
53 | - } | ||
54 | - } else if (!host.equals(other.host)) { | ||
55 | - return false; | ||
56 | - } | ||
57 | - if (port != other.port) { | ||
58 | - return false; | ||
59 | - } | ||
60 | - return true; | ||
61 | - } | ||
62 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.io.IOException; | ||
4 | - | ||
5 | -/** | ||
6 | - * Internal message representation with additional attributes | ||
7 | - * for supporting, synchronous request/reply behavior. | ||
8 | - */ | ||
9 | -public final class InternalMessage implements Message { | ||
10 | - | ||
11 | - private long id; | ||
12 | - private Endpoint sender; | ||
13 | - private String type; | ||
14 | - private Object payload; | ||
15 | - private transient NettyMessagingService messagingService; | ||
16 | - public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY"; | ||
17 | - | ||
18 | - // Must be created using the Builder. | ||
19 | - private InternalMessage() {} | ||
20 | - | ||
21 | - public long id() { | ||
22 | - return id; | ||
23 | - } | ||
24 | - | ||
25 | - public String type() { | ||
26 | - return type; | ||
27 | - } | ||
28 | - | ||
29 | - public Endpoint sender() { | ||
30 | - return sender; | ||
31 | - } | ||
32 | - | ||
33 | - @Override | ||
34 | - public Object payload() { | ||
35 | - return payload; | ||
36 | - } | ||
37 | - | ||
38 | - @Override | ||
39 | - public void respond(Object data) throws IOException { | ||
40 | - Builder builder = new Builder(messagingService); | ||
41 | - InternalMessage message = builder.withId(this.id) | ||
42 | - // FIXME: Sender should be messagingService.localEp. | ||
43 | - .withSender(this.sender) | ||
44 | - .withPayload(data) | ||
45 | - .withType(REPLY_MESSAGE_TYPE) | ||
46 | - .build(); | ||
47 | - messagingService.sendAsync(sender, message); | ||
48 | - } | ||
49 | - | ||
50 | - | ||
51 | - /** | ||
52 | - * Builder for InternalMessages. | ||
53 | - */ | ||
54 | - public static class Builder { | ||
55 | - private InternalMessage message; | ||
56 | - | ||
57 | - public Builder(NettyMessagingService messagingService) { | ||
58 | - message = new InternalMessage(); | ||
59 | - message.messagingService = messagingService; | ||
60 | - } | ||
61 | - | ||
62 | - public Builder withId(long id) { | ||
63 | - message.id = id; | ||
64 | - return this; | ||
65 | - } | ||
66 | - | ||
67 | - public Builder withType(String type) { | ||
68 | - message.type = type; | ||
69 | - return this; | ||
70 | - } | ||
71 | - | ||
72 | - public Builder withSender(Endpoint sender) { | ||
73 | - message.sender = sender; | ||
74 | - return this; | ||
75 | - } | ||
76 | - public Builder withPayload(Object payload) { | ||
77 | - message.payload = payload; | ||
78 | - return this; | ||
79 | - } | ||
80 | - | ||
81 | - public InternalMessage build() { | ||
82 | - return message; | ||
83 | - } | ||
84 | - } | ||
85 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import org.onlab.util.KryoPool; | ||
4 | -import org.slf4j.Logger; | ||
5 | -import org.slf4j.LoggerFactory; | ||
6 | - | ||
7 | -import java.util.ArrayList; | ||
8 | -import java.util.HashMap; | ||
9 | - | ||
10 | -/** | ||
11 | - * Kryo Serializer. | ||
12 | - */ | ||
13 | -public class KryoSerializer implements Serializer { | ||
14 | - | ||
15 | - private final Logger log = LoggerFactory.getLogger(getClass()); | ||
16 | - | ||
17 | - private KryoPool serializerPool; | ||
18 | - | ||
19 | - public KryoSerializer() { | ||
20 | - setupKryoPool(); | ||
21 | - } | ||
22 | - | ||
23 | - /** | ||
24 | - * Sets up the common serialzers pool. | ||
25 | - */ | ||
26 | - protected void setupKryoPool() { | ||
27 | - // FIXME Slice out types used in common to separate pool/namespace. | ||
28 | - serializerPool = KryoPool.newBuilder() | ||
29 | - .register(ArrayList.class, | ||
30 | - HashMap.class, | ||
31 | - ArrayList.class | ||
32 | - ) | ||
33 | - .build() | ||
34 | - .populate(1); | ||
35 | - } | ||
36 | - | ||
37 | - | ||
38 | - @Override | ||
39 | - public Object decode(byte[] data) { | ||
40 | - return serializerPool.deserialize(data); | ||
41 | - } | ||
42 | - | ||
43 | - @Override | ||
44 | - public byte[] encode(Object payload) { | ||
45 | - return serializerPool.serialize(payload); | ||
46 | - } | ||
47 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.io.IOException; | ||
4 | - | ||
5 | -/** | ||
6 | - * A unit of communication. | ||
7 | - * Has a payload. Also supports a feature to respond back to the sender. | ||
8 | - */ | ||
9 | -public interface Message { | ||
10 | - | ||
11 | - /** | ||
12 | - * Returns the payload of this message. | ||
13 | - * @return message payload. | ||
14 | - */ | ||
15 | - public Object payload(); | ||
16 | - | ||
17 | - /** | ||
18 | - * Sends a reply back to the sender of this messge. | ||
19 | - * @param data payload of the response. | ||
20 | - * @throws IOException if there is a communication error. | ||
21 | - */ | ||
22 | - public void respond(Object data) throws IOException; | ||
23 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.util.Arrays; | ||
4 | -import java.util.List; | ||
5 | - | ||
6 | -import static com.google.common.base.Preconditions.checkState; | ||
7 | - | ||
8 | -import io.netty.buffer.ByteBuf; | ||
9 | -import io.netty.channel.ChannelHandlerContext; | ||
10 | -import io.netty.handler.codec.ByteToMessageDecoder; | ||
11 | - | ||
12 | -/** | ||
13 | - * Decode bytes into a InternalMessage. | ||
14 | - */ | ||
15 | -public class MessageDecoder extends ByteToMessageDecoder { | ||
16 | - | ||
17 | - private final NettyMessagingService messagingService; | ||
18 | - private final Serializer serializer; | ||
19 | - | ||
20 | - public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) { | ||
21 | - this.messagingService = messagingService; | ||
22 | - this.serializer = serializer; | ||
23 | - } | ||
24 | - | ||
25 | - @Override | ||
26 | - protected void decode(ChannelHandlerContext context, ByteBuf in, | ||
27 | - List<Object> messages) throws Exception { | ||
28 | - | ||
29 | - byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array(); | ||
30 | - checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble"); | ||
31 | - | ||
32 | - // read message Id. | ||
33 | - long id = in.readLong(); | ||
34 | - | ||
35 | - // read message type; first read size and then bytes. | ||
36 | - String type = new String(in.readBytes(in.readInt()).array()); | ||
37 | - | ||
38 | - // read sender host name; first read size and then bytes. | ||
39 | - String host = new String(in.readBytes(in.readInt()).array()); | ||
40 | - | ||
41 | - // read sender port. | ||
42 | - int port = in.readInt(); | ||
43 | - | ||
44 | - Endpoint sender = new Endpoint(host, port); | ||
45 | - | ||
46 | - // read message payload; first read size and then bytes. | ||
47 | - Object payload = serializer.decode(in.readBytes(in.readInt()).array()); | ||
48 | - | ||
49 | - InternalMessage message = new InternalMessage.Builder(messagingService) | ||
50 | - .withId(id) | ||
51 | - .withSender(sender) | ||
52 | - .withType(type) | ||
53 | - .withPayload(payload) | ||
54 | - .build(); | ||
55 | - | ||
56 | - messages.add(message); | ||
57 | - } | ||
58 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import io.netty.buffer.ByteBuf; | ||
4 | -import io.netty.channel.ChannelHandlerContext; | ||
5 | -import io.netty.handler.codec.MessageToByteEncoder; | ||
6 | - | ||
7 | -/** | ||
8 | - * Encode InternalMessage out into a byte buffer. | ||
9 | - */ | ||
10 | -public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { | ||
11 | - | ||
12 | - // onosiscool in ascii | ||
13 | - public static final byte[] PREAMBLE = "onosiscool".getBytes(); | ||
14 | - | ||
15 | - private final Serializer serializer; | ||
16 | - | ||
17 | - public MessageEncoder(Serializer serializer) { | ||
18 | - this.serializer = serializer; | ||
19 | - } | ||
20 | - | ||
21 | - @Override | ||
22 | - protected void encode(ChannelHandlerContext context, InternalMessage message, | ||
23 | - ByteBuf out) throws Exception { | ||
24 | - | ||
25 | - // write preamble | ||
26 | - out.writeBytes(PREAMBLE); | ||
27 | - | ||
28 | - // write id | ||
29 | - out.writeLong(message.id()); | ||
30 | - | ||
31 | - // write type length | ||
32 | - out.writeInt(message.type().length()); | ||
33 | - | ||
34 | - // write type | ||
35 | - out.writeBytes(message.type().getBytes()); | ||
36 | - | ||
37 | - // write sender host name size | ||
38 | - out.writeInt(message.sender().host().length()); | ||
39 | - | ||
40 | - // write sender host name. | ||
41 | - out.writeBytes(message.sender().host().getBytes()); | ||
42 | - | ||
43 | - // write port | ||
44 | - out.writeInt(message.sender().port()); | ||
45 | - | ||
46 | - try { | ||
47 | - serializer.encode(message.payload()); | ||
48 | - } catch (Exception e) { | ||
49 | - e.printStackTrace(); | ||
50 | - } | ||
51 | - | ||
52 | - byte[] payload = serializer.encode(message.payload()); | ||
53 | - | ||
54 | - // write payload length. | ||
55 | - out.writeInt(payload.length); | ||
56 | - | ||
57 | - // write payload bytes | ||
58 | - out.writeBytes(payload); | ||
59 | - } | ||
60 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.io.IOException; | ||
4 | - | ||
5 | -/** | ||
6 | - * Handler for a message. | ||
7 | - */ | ||
8 | -public interface MessageHandler { | ||
9 | - | ||
10 | - /** | ||
11 | - * Handles the message. | ||
12 | - * @param message message. | ||
13 | - * @throws IOException. | ||
14 | - */ | ||
15 | - public void handle(Message message) throws IOException; | ||
16 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.io.IOException; | ||
4 | - | ||
5 | -/** | ||
6 | - * Interface for low level messaging primitives. | ||
7 | - */ | ||
8 | -public interface MessagingService { | ||
9 | - /** | ||
10 | - * Sends a message asynchronously to the specified communication end point. | ||
11 | - * The message is specified using the type and payload. | ||
12 | - * @param ep end point to send the message to. | ||
13 | - * @param type type of message. | ||
14 | - * @param payload message payload. | ||
15 | - * @throws IOException | ||
16 | - */ | ||
17 | - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException; | ||
18 | - | ||
19 | - /** | ||
20 | - * Sends a message synchronously and waits for a response. | ||
21 | - * @param ep end point to send the message to. | ||
22 | - * @param type type of message. | ||
23 | - * @param payload message payload. | ||
24 | - * @return a response future | ||
25 | - * @throws IOException | ||
26 | - */ | ||
27 | - public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException; | ||
28 | - | ||
29 | - /** | ||
30 | - * Registers a new message handler for message type. | ||
31 | - * @param type message type. | ||
32 | - * @param handler message handler | ||
33 | - */ | ||
34 | - public void registerHandler(String type, MessageHandler handler); | ||
35 | - | ||
36 | - /** | ||
37 | - * Unregister current handler, if one exists for message type. | ||
38 | - * @param type message type | ||
39 | - */ | ||
40 | - public void unregisterHandler(String type); | ||
41 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.io.IOException; | ||
4 | -import java.net.UnknownHostException; | ||
5 | -import java.util.concurrent.ConcurrentHashMap; | ||
6 | -import java.util.concurrent.ConcurrentMap; | ||
7 | -import java.util.concurrent.TimeUnit; | ||
8 | - | ||
9 | -import io.netty.bootstrap.Bootstrap; | ||
10 | -import io.netty.bootstrap.ServerBootstrap; | ||
11 | -import io.netty.buffer.PooledByteBufAllocator; | ||
12 | -import io.netty.channel.Channel; | ||
13 | -import io.netty.channel.ChannelFuture; | ||
14 | -import io.netty.channel.ChannelHandlerContext; | ||
15 | -import io.netty.channel.ChannelInitializer; | ||
16 | -import io.netty.channel.ChannelOption; | ||
17 | -import io.netty.channel.EventLoopGroup; | ||
18 | -import io.netty.channel.SimpleChannelInboundHandler; | ||
19 | -import io.netty.channel.nio.NioEventLoopGroup; | ||
20 | -import io.netty.channel.socket.SocketChannel; | ||
21 | -import io.netty.channel.socket.nio.NioServerSocketChannel; | ||
22 | -import io.netty.channel.socket.nio.NioSocketChannel; | ||
23 | - | ||
24 | -import org.apache.commons.lang.math.RandomUtils; | ||
25 | -import org.apache.commons.pool.KeyedObjectPool; | ||
26 | -import org.apache.commons.pool.KeyedPoolableObjectFactory; | ||
27 | -import org.apache.commons.pool.impl.GenericKeyedObjectPool; | ||
28 | -import org.slf4j.Logger; | ||
29 | -import org.slf4j.LoggerFactory; | ||
30 | - | ||
31 | -import com.google.common.cache.Cache; | ||
32 | -import com.google.common.cache.CacheBuilder; | ||
33 | - | ||
34 | -/** | ||
35 | - * A Netty based implementation of MessagingService. | ||
36 | - */ | ||
37 | -public class NettyMessagingService implements MessagingService { | ||
38 | - | ||
39 | - private final Logger log = LoggerFactory.getLogger(getClass()); | ||
40 | - | ||
41 | - private KeyedObjectPool<Endpoint, Channel> channels = | ||
42 | - new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); | ||
43 | - private final int port; | ||
44 | - private final EventLoopGroup bossGroup = new NioEventLoopGroup(); | ||
45 | - private final EventLoopGroup workerGroup = new NioEventLoopGroup(); | ||
46 | - private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); | ||
47 | - private Cache<Long, AsyncResponse<?>> responseFutures; | ||
48 | - private final Endpoint localEp; | ||
49 | - | ||
50 | - protected Serializer serializer; | ||
51 | - | ||
52 | - public NettyMessagingService() { | ||
53 | - // TODO: Default port should be configurable. | ||
54 | - this(8080); | ||
55 | - } | ||
56 | - | ||
57 | - // FIXME: Constructor should not throw exceptions. | ||
58 | - public NettyMessagingService(int port) { | ||
59 | - this.port = port; | ||
60 | - try { | ||
61 | - localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port); | ||
62 | - } catch (UnknownHostException e) { | ||
63 | - // bailing out. | ||
64 | - throw new RuntimeException(e); | ||
65 | - } | ||
66 | - } | ||
67 | - | ||
68 | - public void activate() throws Exception { | ||
69 | - responseFutures = CacheBuilder.newBuilder() | ||
70 | - .maximumSize(100000) | ||
71 | - .weakValues() | ||
72 | - // TODO: Once the entry expires, notify blocking threads (if any). | ||
73 | - .expireAfterWrite(10, TimeUnit.MINUTES) | ||
74 | - .build(); | ||
75 | - startAcceptingConnections(); | ||
76 | - } | ||
77 | - | ||
78 | - public void deactivate() throws Exception { | ||
79 | - channels.close(); | ||
80 | - bossGroup.shutdownGracefully(); | ||
81 | - workerGroup.shutdownGracefully(); | ||
82 | - } | ||
83 | - | ||
84 | - @Override | ||
85 | - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException { | ||
86 | - InternalMessage message = new InternalMessage.Builder(this) | ||
87 | - .withId(RandomUtils.nextLong()) | ||
88 | - .withSender(localEp) | ||
89 | - .withType(type) | ||
90 | - .withPayload(payload) | ||
91 | - .build(); | ||
92 | - sendAsync(ep, message); | ||
93 | - } | ||
94 | - | ||
95 | - protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException { | ||
96 | - Channel channel = null; | ||
97 | - try { | ||
98 | - channel = channels.borrowObject(ep); | ||
99 | - channel.eventLoop().execute(new WriteTask(channel, message)); | ||
100 | - } catch (Exception e) { | ||
101 | - throw new IOException(e); | ||
102 | - } finally { | ||
103 | - try { | ||
104 | - channels.returnObject(ep, channel); | ||
105 | - } catch (Exception e) { | ||
106 | - log.warn("Error returning object back to the pool", e); | ||
107 | - // ignored. | ||
108 | - } | ||
109 | - } | ||
110 | - } | ||
111 | - | ||
112 | - @Override | ||
113 | - public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) | ||
114 | - throws IOException { | ||
115 | - AsyncResponse<T> futureResponse = new AsyncResponse<T>(); | ||
116 | - Long messageId = RandomUtils.nextLong(); | ||
117 | - responseFutures.put(messageId, futureResponse); | ||
118 | - InternalMessage message = new InternalMessage.Builder(this) | ||
119 | - .withId(messageId) | ||
120 | - .withSender(localEp) | ||
121 | - .withType(type) | ||
122 | - .withPayload(payload) | ||
123 | - .build(); | ||
124 | - sendAsync(ep, message); | ||
125 | - return futureResponse; | ||
126 | - } | ||
127 | - | ||
128 | - @Override | ||
129 | - public void registerHandler(String type, MessageHandler handler) { | ||
130 | - // TODO: Is this the right semantics for handler registration? | ||
131 | - handlers.putIfAbsent(type, handler); | ||
132 | - } | ||
133 | - | ||
134 | - public void unregisterHandler(String type) { | ||
135 | - handlers.remove(type); | ||
136 | - } | ||
137 | - | ||
138 | - private MessageHandler getMessageHandler(String type) { | ||
139 | - return handlers.get(type); | ||
140 | - } | ||
141 | - | ||
142 | - private void startAcceptingConnections() throws InterruptedException { | ||
143 | - ServerBootstrap b = new ServerBootstrap(); | ||
144 | - b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); | ||
145 | - b.group(bossGroup, workerGroup) | ||
146 | - .channel(NioServerSocketChannel.class) | ||
147 | - .childHandler(new OnosCommunicationChannelInitializer()) | ||
148 | - .option(ChannelOption.SO_BACKLOG, 128) | ||
149 | - .childOption(ChannelOption.SO_KEEPALIVE, true); | ||
150 | - | ||
151 | - // Bind and start to accept incoming connections. | ||
152 | - b.bind(port).sync(); | ||
153 | - } | ||
154 | - | ||
155 | - private class OnosCommunicationChannelFactory | ||
156 | - implements KeyedPoolableObjectFactory<Endpoint, Channel> { | ||
157 | - | ||
158 | - @Override | ||
159 | - public void activateObject(Endpoint endpoint, Channel channel) | ||
160 | - throws Exception { | ||
161 | - } | ||
162 | - | ||
163 | - @Override | ||
164 | - public void destroyObject(Endpoint ep, Channel channel) throws Exception { | ||
165 | - channel.close(); | ||
166 | - } | ||
167 | - | ||
168 | - @Override | ||
169 | - public Channel makeObject(Endpoint ep) throws Exception { | ||
170 | - Bootstrap b = new Bootstrap(); | ||
171 | - b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); | ||
172 | - b.group(workerGroup); | ||
173 | - // TODO: Make this faster: | ||
174 | - // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0 | ||
175 | - b.channel(NioSocketChannel.class); | ||
176 | - b.option(ChannelOption.SO_KEEPALIVE, true); | ||
177 | - b.handler(new OnosCommunicationChannelInitializer()); | ||
178 | - | ||
179 | - // Start the client. | ||
180 | - ChannelFuture f = b.connect(ep.host(), ep.port()).sync(); | ||
181 | - return f.channel(); | ||
182 | - } | ||
183 | - | ||
184 | - @Override | ||
185 | - public void passivateObject(Endpoint ep, Channel channel) | ||
186 | - throws Exception { | ||
187 | - } | ||
188 | - | ||
189 | - @Override | ||
190 | - public boolean validateObject(Endpoint ep, Channel channel) { | ||
191 | - return channel.isOpen(); | ||
192 | - } | ||
193 | - } | ||
194 | - | ||
195 | - private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { | ||
196 | - | ||
197 | - @Override | ||
198 | - protected void initChannel(SocketChannel channel) throws Exception { | ||
199 | - channel.pipeline() | ||
200 | - .addLast(new MessageEncoder(serializer)) | ||
201 | - .addLast(new MessageDecoder(NettyMessagingService.this, serializer)) | ||
202 | - .addLast(new NettyMessagingService.InboundMessageDispatcher()); | ||
203 | - } | ||
204 | - } | ||
205 | - | ||
206 | - private class WriteTask implements Runnable { | ||
207 | - | ||
208 | - private final Object message; | ||
209 | - private final Channel channel; | ||
210 | - | ||
211 | - public WriteTask(Channel channel, Object message) { | ||
212 | - this.message = message; | ||
213 | - this.channel = channel; | ||
214 | - } | ||
215 | - | ||
216 | - @Override | ||
217 | - public void run() { | ||
218 | - channel.writeAndFlush(message); | ||
219 | - } | ||
220 | - } | ||
221 | - | ||
222 | - private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> { | ||
223 | - | ||
224 | - @Override | ||
225 | - protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception { | ||
226 | - String type = message.type(); | ||
227 | - if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { | ||
228 | - try { | ||
229 | - AsyncResponse<?> futureResponse = | ||
230 | - NettyMessagingService.this.responseFutures.getIfPresent(message.id()); | ||
231 | - if (futureResponse != null) { | ||
232 | - futureResponse.setResponse(message.payload()); | ||
233 | - } | ||
234 | - log.warn("Received a reply. But was unable to locate the request handle"); | ||
235 | - } finally { | ||
236 | - NettyMessagingService.this.responseFutures.invalidate(message.id()); | ||
237 | - } | ||
238 | - return; | ||
239 | - } | ||
240 | - MessageHandler handler = NettyMessagingService.this.getMessageHandler(type); | ||
241 | - handler.handle(message); | ||
242 | - } | ||
243 | - } | ||
244 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.util.concurrent.TimeUnit; | ||
4 | -import java.util.concurrent.TimeoutException; | ||
5 | - | ||
6 | -/** | ||
7 | - * Response object returned when making synchronous requests. | ||
8 | - * Can you used to check is a response is ready and/or wait for a response | ||
9 | - * to become available. | ||
10 | - * | ||
11 | - * @param <T> type of response. | ||
12 | - */ | ||
13 | -public interface Response<T> { | ||
14 | - | ||
15 | - /** | ||
16 | - * Gets the response waiting for a designated timeout period. | ||
17 | - * @param timeout timeout period (since request was sent out) | ||
18 | - * @param tu unit of time. | ||
19 | - * @return response | ||
20 | - * @throws TimeoutException if the timeout expires before the response arrives. | ||
21 | - */ | ||
22 | - public T get(long timeout, TimeUnit tu) throws TimeoutException; | ||
23 | - | ||
24 | - /** | ||
25 | - * Gets the response waiting for indefinite timeout period. | ||
26 | - * @return response | ||
27 | - * @throws InterruptedException if the thread is interrupted before the response arrives. | ||
28 | - */ | ||
29 | - public T get() throws InterruptedException; | ||
30 | - | ||
31 | - /** | ||
32 | - * Checks if the response is ready without blocking. | ||
33 | - * @return true if response is ready, false otherwise. | ||
34 | - */ | ||
35 | - public boolean isReady(); | ||
36 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -/** | ||
4 | - * Interface for encoding/decoding message payloads. | ||
5 | - */ | ||
6 | -public interface Serializer { | ||
7 | - | ||
8 | - /** | ||
9 | - * Decodes the specified byte array to a POJO. | ||
10 | - * | ||
11 | - * @param data byte array. | ||
12 | - * @return POJO | ||
13 | - */ | ||
14 | - Object decode(byte[] data); | ||
15 | - | ||
16 | - /** | ||
17 | - * Encodes the specified POJO into a byte array. | ||
18 | - * | ||
19 | - * @param data POJO to be encoded | ||
20 | - * @return byte array. | ||
21 | - */ | ||
22 | - byte[] encode(Object message); | ||
23 | - | ||
24 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -import java.util.concurrent.TimeUnit; | ||
4 | - | ||
5 | -public final class SimpleClient { | ||
6 | - private SimpleClient() {} | ||
7 | - | ||
8 | - public static void main(String... args) throws Exception { | ||
9 | - NettyMessagingService messaging = new TestNettyMessagingService(9081); | ||
10 | - messaging.activate(); | ||
11 | - | ||
12 | - messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World"); | ||
13 | - Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World"); | ||
14 | - System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS)); | ||
15 | - } | ||
16 | - | ||
17 | - public static class TestNettyMessagingService extends NettyMessagingService { | ||
18 | - public TestNettyMessagingService(int port) throws Exception { | ||
19 | - super(port); | ||
20 | - Serializer serializer = new KryoSerializer(); | ||
21 | - this.serializer = serializer; | ||
22 | - } | ||
23 | - } | ||
24 | -} |
1 | -package org.onlab.netty; | ||
2 | - | ||
3 | -public final class SimpleServer { | ||
4 | - private SimpleServer() {} | ||
5 | - | ||
6 | - public static void main(String... args) throws Exception { | ||
7 | - NettyMessagingService server = new TestNettyMessagingService(); | ||
8 | - server.activate(); | ||
9 | - server.registerHandler("simple", new LoggingHandler()); | ||
10 | - server.registerHandler("echo", new EchoHandler()); | ||
11 | - } | ||
12 | - | ||
13 | - public static class TestNettyMessagingService extends NettyMessagingService { | ||
14 | - protected TestNettyMessagingService() { | ||
15 | - Serializer serializer = new KryoSerializer(); | ||
16 | - this.serializer = serializer; | ||
17 | - } | ||
18 | - } | ||
19 | -} |
-
Please register or login to post a comment