Aaron Kruglikov
Committed by Gerrit Code Review

Makes establishing connections between onos nodes asynchronous, prevents threads…

… blocking when nodes are inaccessible.

Change-Id: I46ce54505e8c4c34b56009412ddb1d645c83aaa3
......@@ -96,8 +96,8 @@ public class NettyMessaging implements MessagingService {
})
.build();
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
private final GenericKeyedObjectPool<Endpoint, Connection> channels
= new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
......@@ -179,18 +179,13 @@ public class NettyMessaging implements MessagingService {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
Channel channel = null;
Connection connection = null;
try {
channel = channels.borrowObject(ep);
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
future.completeExceptionally(channelFuture.cause());
} else {
future.complete(null);
}
});
connection = channels.borrowObject(ep);
connection.send(message, future);
} finally {
channels.returnObject(ep, channel);
channels.returnObject(ep, connection);
}
} catch (Exception e) {
future.completeExceptionally(e);
......@@ -292,21 +287,22 @@ public class NettyMessaging implements MessagingService {
}
private class OnosCommunicationChannelFactory
implements KeyedPoolableObjectFactory<Endpoint, Channel> {
implements KeyedPoolableObjectFactory<Endpoint, Connection> {
@Override
public void activateObject(Endpoint endpoint, Channel channel)
public void activateObject(Endpoint endpoint, Connection connection)
throws Exception {
}
@Override
public void destroyObject(Endpoint ep, Channel channel) throws Exception {
public void destroyObject(Endpoint ep, Connection connection) throws Exception {
log.debug("Closing connection to {}", ep);
channel.close();
//Is this the right way to destroy?
connection.destroy();
}
@Override
public Channel makeObject(Endpoint ep) throws Exception {
public Connection makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
......@@ -324,19 +320,28 @@ public class NettyMessaging implements MessagingService {
bootstrap.handler(new OnosCommunicationChannelInitializer());
}
// Start the client.
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
CompletableFuture<Channel> retFuture = new CompletableFuture<>();
ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
f.addListener(future -> {
if (future.isSuccess()) {
retFuture.complete(f.channel());
} else {
retFuture.completeExceptionally(future.cause());
}
});
log.debug("Established a new connection to {}", ep);
return f.channel();
return new Connection(retFuture);
}
@Override
public void passivateObject(Endpoint ep, Channel channel)
public void passivateObject(Endpoint ep, Connection connection)
throws Exception {
}
@Override
public boolean validateObject(Endpoint ep, Channel channel) {
return channel.isOpen();
public boolean validateObject(Endpoint ep, Connection connection) {
return connection.validate();
}
}
......@@ -486,4 +491,62 @@ public class NettyMessaging implements MessagingService {
executor.execute(() -> future.completeExceptionally(error));
}
}
private final class Connection {
private final CompletableFuture<Channel> internalFuture;
public Connection(CompletableFuture<Channel> internalFuture) {
this.internalFuture = internalFuture;
}
/**
* Sends a message out on its channel and associated the message with a
* completable future used for signaling.
* @param message the message to be sent
* @param future a future that is completed normally or exceptionally if
* message sending succeeds or fails respectively
*/
public void send(Object message, CompletableFuture<Void> future) {
internalFuture.whenComplete((channel, throwable) -> {
if (throwable == null) {
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
future.completeExceptionally(channelFuture.cause());
} else {
future.complete(null);
}
});
} else {
future.completeExceptionally(throwable);
}
});
}
/**
* Destroys a channel by closing its channel (if it exists) and
* cancelling its future.
*/
public void destroy() {
Channel channel = internalFuture.getNow(null);
if (channel != null) {
channel.close();
}
internalFuture.cancel(false);
}
/**
* Determines whether the connection is valid meaning it is either
* complete with and active channel
* or it has not yet completed.
* @return true if the channel has an active connection or has not
* yet completed
*/
public boolean validate() {
if (internalFuture.isCompletedExceptionally()) {
return false;
}
Channel channel = internalFuture.getNow(null);
return channel == null || channel.isActive();
}
}
}
......