Madan Jampani

Change netty reply timeout to 2s and run a periodic cleanup task to timeout outstanding requests

Change-Id: Ie0381b9371bfd8a3d680872bfc5dce54a19aaca2
...@@ -84,6 +84,7 @@ import java.util.function.Consumer; ...@@ -84,6 +84,7 @@ import java.util.function.Consumer;
84 @Service 84 @Service
85 public class NettyMessagingManager implements MessagingService { 85 public class NettyMessagingManager implements MessagingService {
86 86
87 + private static final int REPLY_TIME_OUT_SEC = 2;
87 private static final short MIN_KS_LENGTH = 6; 88 private static final short MIN_KS_LENGTH = 6;
88 89
89 private final Logger log = LoggerFactory.getLogger(getClass()); 90 private final Logger log = LoggerFactory.getLogger(getClass());
...@@ -96,7 +97,7 @@ public class NettyMessagingManager implements MessagingService { ...@@ -96,7 +97,7 @@ public class NettyMessagingManager implements MessagingService {
96 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>(); 97 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
97 private final AtomicLong messageIdGenerator = new AtomicLong(0); 98 private final AtomicLong messageIdGenerator = new AtomicLong(0);
98 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder() 99 private final Cache<Long, Callback> callbacks = CacheBuilder.newBuilder()
99 - .expireAfterWrite(10, TimeUnit.SECONDS) 100 + .expireAfterWrite(REPLY_TIME_OUT_SEC, TimeUnit.SECONDS)
100 .removalListener(new RemovalListener<Long, Callback>() { 101 .removalListener(new RemovalListener<Long, Callback>() {
101 @Override 102 @Override
102 public void onRemoval(RemovalNotification<Long, Callback> entry) { 103 public void onRemoval(RemovalNotification<Long, Callback> entry) {
...@@ -145,6 +146,7 @@ public class NettyMessagingManager implements MessagingService { ...@@ -145,6 +146,7 @@ public class NettyMessagingManager implements MessagingService {
145 initEventLoopGroup(); 146 initEventLoopGroup();
146 startAcceptingConnections(); 147 startAcceptingConnections();
147 started.set(true); 148 started.set(true);
149 + serverGroup.scheduleWithFixedDelay(callbacks::cleanUp, 0, REPLY_TIME_OUT_SEC, TimeUnit.SECONDS);
148 log.info("Started"); 150 log.info("Started");
149 } 151 }
150 152
......