Madan Jampani

sendAndReceive now returns a Future instead of Reponse

......@@ -4,6 +4,7 @@ import static java.lang.Thread.sleep;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -12,7 +13,6 @@ import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsManager;
import org.onlab.netty.Endpoint;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -74,10 +74,10 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
for (int i = 0; i < warmup; i++) {
messaging.sendAsync(endpoint, "simple", "Hello World".getBytes());
Response response = messaging
Future<byte[]> responseFuture = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
response.get(100000, TimeUnit.MILLISECONDS);
responseFuture.get(100000, TimeUnit.MILLISECONDS);
}
log.info("measuring round-trip send & receive");
......@@ -85,13 +85,13 @@ private static Logger log = LoggerFactory.getLogger(SimpleNettyClient.class);
int timeouts = 0;
for (int i = 0; i < iterations; i++) {
Response response;
Future<byte[]> responseFuture;
Timer.Context context = sendAndReceiveTimer.time();
try {
response = messaging
responseFuture = messaging
.sendAndReceive(endpoint, "echo",
"Hello World".getBytes());
response.get(10000, TimeUnit.MILLISECONDS);
responseFuture.get(10000, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
timeouts++;
log.info("timeout:" + timeouts + " at iteration:" + i);
......
......@@ -5,6 +5,8 @@ import java.util.Set;
import org.onlab.onos.cluster.NodeId;
import com.google.common.util.concurrent.ListenableFuture;
// TODO: remove IOExceptions?
/**
* Service for assisting communications between controller cluster nodes.
......@@ -40,10 +42,10 @@ public interface ClusterCommunicationService {
* Sends a message synchronously.
* @param message message to send
* @param toNodeId recipient node identifier
* @return ClusterMessageResponse which is reply future.
* @return reply future.
* @throws IOException
*/
ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
/**
* Adds a new subscriber for the specified message subject.
......
package org.onlab.onos.store.cluster.messaging;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onlab.onos.cluster.NodeId;
public interface ClusterMessageResponse extends Future<byte[]> {
public NodeId sender();
// TODO InterruptedException, ExecutionException removed from original
// Future declaration. Revisit if we ever need those.
@Override
public byte[] get(long timeout, TimeUnit unit) throws TimeoutException;
}
......@@ -4,9 +4,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -20,7 +18,6 @@ import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
......@@ -32,10 +29,11 @@ import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
import org.onlab.netty.NettyMessagingService;
import org.onlab.netty.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.ListenableFuture;
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
......@@ -133,14 +131,12 @@ public class ClusterCommunicationManager
}
@Override
public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
Response responseFuture =
messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return new InternalClusterMessageResponse(toNodeId, responseFuture);
return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
} catch (IOException e) {
log.error("Failed interaction with remote nodeId: " + toNodeId, e);
......@@ -188,60 +184,4 @@ public class ClusterCommunicationManager
rawMessage.respond(response);
}
}
private static final class InternalClusterMessageResponse
implements ClusterMessageResponse {
private final NodeId sender;
private final Response responseFuture;
private volatile boolean isCancelled = false;
private volatile boolean isDone = false;
public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
this.sender = sender;
this.responseFuture = responseFuture;
}
@Override
public NodeId sender() {
return sender;
}
@Override
public byte[] get(long timeout, TimeUnit timeunit)
throws TimeoutException {
final byte[] result = responseFuture.get(timeout, timeunit);
isDone = true;
return result;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone()) {
return false;
}
// doing nothing for now
// when onlab.netty Response support cancel, call them.
isCancelled = true;
return true;
}
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override
public boolean isDone() {
return this.isDone || isCancelled();
}
@Override
public byte[] get() throws InterruptedException, ExecutionException {
// TODO: consider forbidding this call and force the use of timed get
// to enforce handling of remote peer failure scenario
final byte[] result = responseFuture.get();
isDone = true;
return result;
}
}
}
}
\ No newline at end of file
......
......@@ -12,6 +12,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
......@@ -49,7 +50,6 @@ import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.DistributedStoreSerializers;
......@@ -57,6 +57,7 @@ import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
......@@ -213,9 +214,9 @@ public class DistributedFlowRuleStore
SERIALIZER.encode(rule));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
......@@ -247,9 +248,9 @@ public class DistributedFlowRuleStore
SERIALIZER.encode(deviceId));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
}
......@@ -291,14 +292,17 @@ public class DistributedFlowRuleStore
SERIALIZER.encode(operation));
try {
ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (IOException | TimeoutException e) {
// FIXME: throw a FlowStoreException
throw new RuntimeException(e);
ListenableFuture<byte[]> responseFuture =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
@Override
public CompletedBatchOperation apply(byte[] input) {
return SERIALIZER.decode(input);
}
});
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return null;
}
private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
......
......@@ -4,6 +4,7 @@ import static org.onlab.onos.store.statistic.impl.StatisticStoreMessageSubjects.
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -21,7 +22,6 @@ import org.onlab.onos.net.statistic.StatisticStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
import org.onlab.onos.store.flow.ReplicaInfo;
import org.onlab.onos.store.flow.ReplicaInfoService;
import org.onlab.onos.store.serializers.KryoNamespaces;
......@@ -34,6 +34,8 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -184,11 +186,11 @@ public class DistributedStatisticStore implements StatisticStore {
SERIALIZER.encode(connectPoint));
try {
ClusterMessageResponse response =
Future<byte[]> response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
} catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a StatsStoreException
throw new RuntimeException(e);
}
......@@ -212,11 +214,11 @@ public class DistributedStatisticStore implements StatisticStore {
SERIALIZER.encode(connectPoint));
try {
ClusterMessageResponse response =
Future<byte[]> response =
clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
return SERIALIZER.decode(response.get(STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS));
} catch (IOException | TimeoutException e) {
} catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
// FIXME: throw a StatsStoreException
throw new RuntimeException(e);
}
......
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* An asynchronous response.
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
*/
public class AsyncResponse implements Response {
private byte[] value;
private boolean done = false;
private final long start = System.nanoTime();
@Override
public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
timeout = timeUnit.toNanos(timeout);
boolean interrupted = false;
try {
synchronized (this) {
while (!done) {
try {
long timeRemaining = timeout - (System.nanoTime() - start);
if (timeRemaining <= 0) {
throw new TimeoutException("Operation timed out.");
}
TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
} catch (InterruptedException e) {
interrupted = true;
}
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
return value;
}
@Override
public byte[] get() throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public boolean isReady() {
return done;
}
/**
* Sets response value and unblocks any thread blocking on the response to become
* available.
* @param data response data.
*/
public synchronized void setResponse(byte[] data) {
if (!done) {
done = true;
value = data;
this.notifyAll();
}
}
}
......@@ -2,6 +2,8 @@ package org.onlab.netty;
import java.io.IOException;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Interface for low level messaging primitives.
*/
......@@ -24,7 +26,7 @@ public interface MessagingService {
* @return a response future
* @throws IOException
*/
public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Registers a new message handler for message type.
......
......@@ -5,6 +5,7 @@ import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
......@@ -26,7 +27,6 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
......@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
/**
* A Netty based implementation of MessagingService.
......@@ -44,7 +46,8 @@ public class NettyMessagingService implements MessagingService {
private final Endpoint localEp;
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
......@@ -119,7 +122,7 @@ public class NettyMessagingService implements MessagingService {
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong())
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(type)
.withPayload(payload)
......@@ -142,10 +145,10 @@ public class NettyMessagingService implements MessagingService {
}
@Override
public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
public ListenableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
AsyncResponse futureResponse = new AsyncResponse();
Long messageId = RandomUtils.nextLong();
SettableFuture<byte[]> futureResponse = SettableFuture.create();
Long messageId = messageIdGenerator.incrementAndGet();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
......@@ -267,10 +270,10 @@ public class NettyMessagingService implements MessagingService {
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
AsyncResponse futureResponse =
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.setResponse(message.payload());
futureResponse.set(message.payload());
} else {
log.warn("Received a reply. But was unable to locate the request handle");
}
......
package org.onlab.netty;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
*/
public interface Response {
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
* @return response payload
* @throws TimeoutException if the timeout expires before the response arrives.
*/
public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Gets the response waiting for indefinite timeout period.
* @return response payload
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
public byte[] get() throws InterruptedException;
/**
* Checks if the response is ready without blocking.
* @return true if response is ready, false otherwise.
*/
public boolean isReady();
}
package org.onlab.netty;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
import static org.junit.Assert.*;
import org.junit.Test;
/**
......@@ -20,8 +23,8 @@ public class PingPongTest {
ponger.activate();
ponger.registerHandler("echo", new EchoHandler());
byte[] payload = RandomUtils.nextBytes(100);
Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
Future<byte[]> responseFuture = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
} finally {
pinger.deactivate();
ponger.deactivate();
......