Madan Jampani

Netty Messaging changes:

1. Lowered the timeout value for expiring unanswered sendAndReceive calls to 10s.
2. Marking the future as complete (with exception) when a entry is evicted due to timeout.
3. Improved exception behavior.
...@@ -23,6 +23,7 @@ import java.net.UnknownHostException; ...@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
23 import java.util.concurrent.ConcurrentHashMap; 23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap; 24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.TimeUnit; 25 import java.util.concurrent.TimeUnit;
26 +import java.util.concurrent.TimeoutException;
26 import java.util.concurrent.atomic.AtomicLong; 27 import java.util.concurrent.atomic.AtomicLong;
27 28
28 import io.netty.bootstrap.Bootstrap; 29 import io.netty.bootstrap.Bootstrap;
...@@ -30,6 +31,7 @@ import io.netty.bootstrap.ServerBootstrap; ...@@ -30,6 +31,7 @@ import io.netty.bootstrap.ServerBootstrap;
30 import io.netty.buffer.PooledByteBufAllocator; 31 import io.netty.buffer.PooledByteBufAllocator;
31 import io.netty.channel.Channel; 32 import io.netty.channel.Channel;
32 import io.netty.channel.ChannelFuture; 33 import io.netty.channel.ChannelFuture;
34 +import io.netty.channel.ChannelFutureListener;
33 import io.netty.channel.ChannelHandler; 35 import io.netty.channel.ChannelHandler;
34 import io.netty.channel.ChannelHandlerContext; 36 import io.netty.channel.ChannelHandlerContext;
35 import io.netty.channel.ChannelInitializer; 37 import io.netty.channel.ChannelInitializer;
...@@ -52,6 +54,8 @@ import org.slf4j.LoggerFactory; ...@@ -52,6 +54,8 @@ import org.slf4j.LoggerFactory;
52 54
53 import com.google.common.cache.Cache; 55 import com.google.common.cache.Cache;
54 import com.google.common.cache.CacheBuilder; 56 import com.google.common.cache.CacheBuilder;
57 +import com.google.common.cache.RemovalListener;
58 +import com.google.common.cache.RemovalNotification;
55 import com.google.common.util.concurrent.ListenableFuture; 59 import com.google.common.util.concurrent.ListenableFuture;
56 import com.google.common.util.concurrent.SettableFuture; 60 import com.google.common.util.concurrent.SettableFuture;
57 61
...@@ -67,8 +71,13 @@ public class NettyMessagingService implements MessagingService { ...@@ -67,8 +71,13 @@ public class NettyMessagingService implements MessagingService {
67 private final AtomicLong messageIdGenerator = new AtomicLong(0); 71 private final AtomicLong messageIdGenerator = new AtomicLong(0);
68 private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder() 72 private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
69 .maximumSize(100000) 73 .maximumSize(100000)
70 - // TODO: Once the entry expires, notify blocking threads (if any). 74 + .expireAfterWrite(10, TimeUnit.SECONDS)
71 - .expireAfterWrite(10, TimeUnit.MINUTES) 75 + .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
76 + @Override
77 + public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
78 + entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
79 + }
80 + })
72 .build(); 81 .build();
73 private final GenericKeyedObjectPool<Endpoint, Channel> channels 82 private final GenericKeyedObjectPool<Endpoint, Channel> channels
74 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); 83 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
...@@ -156,9 +165,12 @@ public class NettyMessagingService implements MessagingService { ...@@ -156,9 +165,12 @@ public class NettyMessagingService implements MessagingService {
156 } finally { 165 } finally {
157 channels.returnObject(ep, channel); 166 channels.returnObject(ep, channel);
158 } 167 }
168 + } catch (IOException e) {
169 + throw e;
159 } catch (Exception e) { 170 } catch (Exception e) {
160 - throw new IOException("Failed to send message to " + ep.toString(), e); 171 + throw new IOException(e);
161 } 172 }
173 +
162 } 174 }
163 175
164 @Override 176 @Override
...@@ -175,7 +187,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -175,7 +187,7 @@ public class NettyMessagingService implements MessagingService {
175 .build(); 187 .build();
176 try { 188 try {
177 sendAsync(ep, message); 189 sendAsync(ep, message);
178 - } catch (IOException e) { 190 + } catch (Exception e) {
179 responseFutures.invalidate(messageId); 191 responseFutures.invalidate(messageId);
180 throw e; 192 throw e;
181 } 193 }
...@@ -280,7 +292,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -280,7 +292,7 @@ public class NettyMessagingService implements MessagingService {
280 292
281 @Override 293 @Override
282 public void run() { 294 public void run() {
283 - channel.writeAndFlush(message, channel.voidPromise()); 295 + channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
284 } 296 }
285 } 297 }
286 298
......