Madan Jampani

Added NettyMessagingService constructor that accepts both ip and port

...@@ -67,7 +67,7 @@ public class ClusterCommunicationManager ...@@ -67,7 +67,7 @@ public class ClusterCommunicationManager
67 @Activate 67 @Activate
68 public void activate() { 68 public void activate() {
69 ControllerNode localNode = clusterService.getLocalNode(); 69 ControllerNode localNode = clusterService.getLocalNode();
70 - NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort()); 70 + NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
71 // FIXME: workaround until it becomes a service. 71 // FIXME: workaround until it becomes a service.
72 try { 72 try {
73 netty.activate(); 73 netty.activate();
......
...@@ -44,8 +44,7 @@ public final class InternalMessage implements Message { ...@@ -44,8 +44,7 @@ public final class InternalMessage implements Message {
44 public void respond(byte[] data) throws IOException { 44 public void respond(byte[] data) throws IOException {
45 Builder builder = new Builder(messagingService); 45 Builder builder = new Builder(messagingService);
46 InternalMessage message = builder.withId(this.id) 46 InternalMessage message = builder.withId(this.id)
47 - // FIXME: Sender should be messagingService.localEp. 47 + .withSender(messagingService.localEp())
48 - .withSender(this.sender)
49 .withPayload(data) 48 .withPayload(data)
50 .withType(REPLY_MESSAGE_TYPE) 49 .withType(REPLY_MESSAGE_TYPE)
51 .build(); 50 .build();
......
...@@ -42,7 +42,6 @@ public class NettyMessagingService implements MessagingService { ...@@ -42,7 +42,6 @@ public class NettyMessagingService implements MessagingService {
42 42
43 private final Logger log = LoggerFactory.getLogger(getClass()); 43 private final Logger log = LoggerFactory.getLogger(getClass());
44 44
45 - private final int port;
46 private final Endpoint localEp; 45 private final Endpoint localEp;
47 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); 46 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
48 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder() 47 private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
...@@ -77,6 +76,10 @@ public class NettyMessagingService implements MessagingService { ...@@ -77,6 +76,10 @@ public class NettyMessagingService implements MessagingService {
77 clientChannelClass = NioSocketChannel.class; 76 clientChannelClass = NioSocketChannel.class;
78 } 77 }
79 78
79 + public NettyMessagingService(String ip, int port) {
80 + localEp = new Endpoint(ip, port);
81 + }
82 +
80 public NettyMessagingService() { 83 public NettyMessagingService() {
81 // TODO: Default port should be configurable. 84 // TODO: Default port should be configurable.
82 this(8080); 85 this(8080);
...@@ -84,7 +87,6 @@ public class NettyMessagingService implements MessagingService { ...@@ -84,7 +87,6 @@ public class NettyMessagingService implements MessagingService {
84 87
85 // FIXME: Constructor should not throw exceptions. 88 // FIXME: Constructor should not throw exceptions.
86 public NettyMessagingService(int port) { 89 public NettyMessagingService(int port) {
87 - this.port = port;
88 try { 90 try {
89 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port); 91 localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
90 } catch (UnknownHostException e) { 92 } catch (UnknownHostException e) {
...@@ -106,6 +108,14 @@ public class NettyMessagingService implements MessagingService { ...@@ -106,6 +108,14 @@ public class NettyMessagingService implements MessagingService {
106 clientGroup.shutdownGracefully(); 108 clientGroup.shutdownGracefully();
107 } 109 }
108 110
111 + /**
112 + * Returns the local endpoint for this instance.
113 + * @return local end point.
114 + */
115 + public Endpoint localEp() {
116 + return localEp;
117 + }
118 +
109 @Override 119 @Override
110 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException { 120 public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
111 InternalMessage message = new InternalMessage.Builder(this) 121 InternalMessage message = new InternalMessage.Builder(this)
...@@ -127,7 +137,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -127,7 +137,7 @@ public class NettyMessagingService implements MessagingService {
127 channels.returnObject(ep, channel); 137 channels.returnObject(ep, channel);
128 } 138 }
129 } catch (Exception e) { 139 } catch (Exception e) {
130 - throw new IOException(e); 140 + throw new IOException("Failed to send message to " + ep.toString(), e);
131 } 141 }
132 } 142 }
133 143
...@@ -174,7 +184,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -174,7 +184,7 @@ public class NettyMessagingService implements MessagingService {
174 .childOption(ChannelOption.SO_KEEPALIVE, true); 184 .childOption(ChannelOption.SO_KEEPALIVE, true);
175 185
176 // Bind and start to accept incoming connections. 186 // Bind and start to accept incoming connections.
177 - b.bind(port).sync(); 187 + b.bind(localEp.port()).sync();
178 } 188 }
179 189
180 private class OnosCommunicationChannelFactory 190 private class OnosCommunicationChannelFactory
......