Madan Jampani

Netty bug fix: Do not use weakValues in a cache where we track outstanding responses.

...@@ -8,7 +8,7 @@ export ONOS_ROOT=${ONOS_ROOT:-~/onos-next} ...@@ -8,7 +8,7 @@ export ONOS_ROOT=${ONOS_ROOT:-~/onos-next}
8 # Setup some environmental context for developers 8 # Setup some environmental context for developers
9 if [ -z "${JAVA_HOME}" ]; then 9 if [ -z "${JAVA_HOME}" ]; then
10 if [ -x /usr/libexec/java_home ]; then 10 if [ -x /usr/libexec/java_home ]; then
11 - export JAVA_HOME=$(/usr/libexec/java_home -v 1.7) 11 + export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
12 elif [ -d /usr/lib/jvm/java-7-openjdk-amd64 ]; then 12 elif [ -d /usr/lib/jvm/java-7-openjdk-amd64 ]; then
13 export JAVA_HOME="/usr/lib/jvm/java-7-openjdk-amd64" 13 export JAVA_HOME="/usr/lib/jvm/java-7-openjdk-amd64"
14 fi 14 fi
......
...@@ -55,8 +55,8 @@ public class Endpoint { ...@@ -55,8 +55,8 @@ public class Endpoint {
55 @Override 55 @Override
56 public String toString() { 56 public String toString() {
57 return MoreObjects.toStringHelper(getClass()) 57 return MoreObjects.toStringHelper(getClass())
58 - .add("port", port)
59 .add("host", host) 58 .add("host", host)
59 + .add("port", port)
60 .toString(); 60 .toString();
61 } 61 }
62 62
......
...@@ -26,7 +26,7 @@ import java.io.IOException; ...@@ -26,7 +26,7 @@ import java.io.IOException;
26 */ 26 */
27 public final class InternalMessage implements Message { 27 public final class InternalMessage implements Message {
28 28
29 - public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY"; 29 + public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
30 30
31 private long id; 31 private long id;
32 private Endpoint sender; 32 private Endpoint sender;
......
...@@ -67,7 +67,6 @@ public class NettyMessagingService implements MessagingService { ...@@ -67,7 +67,6 @@ public class NettyMessagingService implements MessagingService {
67 private final AtomicLong messageIdGenerator = new AtomicLong(0); 67 private final AtomicLong messageIdGenerator = new AtomicLong(0);
68 private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() 68 private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
69 .maximumSize(100000) 69 .maximumSize(100000)
70 - .weakValues()
71 // TODO: Once the entry expires, notify blocking threads (if any). 70 // TODO: Once the entry expires, notify blocking threads (if any).
72 .expireAfterWrite(10, TimeUnit.MINUTES) 71 .expireAfterWrite(10, TimeUnit.MINUTES)
73 .build(); 72 .build();
...@@ -174,7 +173,12 @@ public class NettyMessagingService implements MessagingService { ...@@ -174,7 +173,12 @@ public class NettyMessagingService implements MessagingService {
174 .withType(type) 173 .withType(type)
175 .withPayload(payload) 174 .withPayload(payload)
176 .build(); 175 .build();
176 + try {
177 sendAsync(ep, message); 177 sendAsync(ep, message);
178 + } catch (IOException e) {
179 + responseFutures.invalidate(messageId);
180 + throw e;
181 + }
178 return futureResponse; 182 return futureResponse;
179 } 183 }
180 184
...@@ -293,7 +297,8 @@ public class NettyMessagingService implements MessagingService { ...@@ -293,7 +297,8 @@ public class NettyMessagingService implements MessagingService {
293 if (futureResponse != null) { 297 if (futureResponse != null) {
294 futureResponse.set(message.payload()); 298 futureResponse.set(message.payload());
295 } else { 299 } else {
296 - log.warn("Received a reply. But was unable to locate the request handle"); 300 + log.warn("Received a reply for message id:[{}]. "
301 + + "But was unable to locate the request handle", message.id());
297 } 302 }
298 } finally { 303 } finally {
299 NettyMessagingService.this.responseFutures.invalidate(message.id()); 304 NettyMessagingService.this.responseFutures.invalidate(message.id());
......