Madan Jampani
Committed by Gerrit Code Review

Removed deprecated ClusterCommunicationService APIs

MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean

Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
......@@ -24,61 +24,12 @@ import java.util.function.Function;
import org.onosproject.cluster.NodeId;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
* Broadcast a message to all controller nodes.
*
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
*/
@Deprecated
boolean broadcast(ClusterMessage message);
/**
* Broadcast a message to all controller nodes including self.
*
* @param message message to send
* @return true if the message was sent successfully to all nodes; false otherwise.
*/
@Deprecated
boolean broadcastIncludeSelf(ClusterMessage message);
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent successfully; false otherwise.
*/
@Deprecated
boolean unicast(ClusterMessage message, NodeId toNodeId);
/**
* Multicast a message to a set of controller nodes.
*
* @param message message to send
* @param nodeIds recipient node identifiers
* @return true if the message was sent successfully to all nodes in the group; false otherwise.
*/
@Deprecated
boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
/**
* Sends a message synchronously.
* @param message message to send
* @param toNodeId recipient node identifier
* @return reply future.
*/
@Deprecated
ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
......@@ -120,9 +71,9 @@ public interface ClusterCommunicationService {
* @param encoder function for encoding message to byte[]
* @param toNodeId destination node identifier
* @param <M> message type
* @return true if the message was sent successfully; false otherwise
* @return future that is completed when the message is sent
*/
<M> boolean unicast(M message,
<M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId);
......
......@@ -15,7 +15,6 @@
*/
package org.onosproject.store.cluster.messaging;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
......@@ -32,9 +31,9 @@ public interface MessagingService {
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload bytes.
* @throws IOException when I/O exception of some sort has occurred
* @return future that is completed when the message is sent
*/
void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload);
/**
* Sends a message synchronously and waits for a response.
......
......@@ -42,7 +42,6 @@ import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
......@@ -237,11 +236,11 @@ public class DistributedClusterStore
private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
try {
messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
} catch (IOException e) {
log.trace("Sending heartbeat to {} failed", remoteEp, e);
messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
if (error != null) {
log.trace("Sending heartbeat to {} failed", remoteEp, error);
}
});
}
private class HeartbeatMessageHandler implements Consumer<byte[]> {
......
......@@ -35,10 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
......@@ -62,8 +58,11 @@ public class ClusterCommunicationManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
private NodeId localNodeId;
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
......@@ -73,60 +72,6 @@ public class ClusterCommunicationManager
}
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
byte[] payload = message.getBytes();
for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
}
}
return ok;
}
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) {
boolean ok = true;
byte[] payload = message.getBytes();
for (ControllerNode node : clusterService.getNodes()) {
ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
}
return ok;
}
@Override
public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
boolean ok = true;
final ControllerNode localNode = clusterService.getLocalNode();
byte[] payload = message.getBytes();
for (NodeId nodeId : nodes) {
if (!nodeId.equals(localNode.id())) {
ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
}
}
return ok;
}
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
}
@Override
public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
SettableFuture<byte[]> response = SettableFuture.create();
sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
if (e == null) {
response.set(r);
} else {
response.setException(e);
}
});
return response;
}
@Override
public <M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
......@@ -154,15 +99,19 @@ public class ClusterCommunicationManager
}
@Override
public <M> boolean unicast(M message,
public <M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId) {
try {
byte[] payload = new ClusterMessage(
clusterService.getLocalNode().id(),
localNodeId,
subject,
encoder.apply(message)).getBytes();
return unicastUnchecked(subject, payload, toNodeId);
return doUnicast(subject, payload, toNodeId);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
}
@Override
......@@ -171,10 +120,10 @@ public class ClusterCommunicationManager
Function<M, byte[]> encoder,
Set<NodeId> nodes) {
byte[] payload = new ClusterMessage(
clusterService.getLocalNode().id(),
localNodeId,
subject,
encoder.apply(message)).getBytes();
nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
@Override
......@@ -194,17 +143,11 @@ public class ClusterCommunicationManager
}
}
private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, subject.value(), payload);
return true;
} catch (IOException e) {
log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
return false;
}
return messagingService.sendAsync(nodeEp, subject.value(), payload);
}
private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
......
......@@ -66,27 +66,27 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
}
@Override
public EventuallyConsistentMapBuilder withName(String name) {
public EventuallyConsistentMapBuilder<K, V> withName(String name) {
this.name = checkNotNull(name);
return this;
}
@Override
public EventuallyConsistentMapBuilder withSerializer(
public EventuallyConsistentMapBuilder<K, V> withSerializer(
KryoNamespace.Builder serializerBuilder) {
this.serializerBuilder = checkNotNull(serializerBuilder);
return this;
}
@Override
public EventuallyConsistentMapBuilder withClockService(
public EventuallyConsistentMapBuilder<K, V> withClockService(
ClockService<K, V> clockService) {
this.clockService = checkNotNull(clockService);
return this;
}
@Override
public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
public EventuallyConsistentMapBuilder<K, V> withEventExecutor(ExecutorService executor) {
this.eventExecutor = checkNotNull(executor);
return this;
}
......@@ -99,13 +99,13 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
}
@Override
public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(ScheduledExecutorService executor) {
this.backgroundExecutor = checkNotNull(executor);
return this;
}
@Override
public EventuallyConsistentMapBuilder withPeerUpdateFunction(
public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
return this;
......
......@@ -509,12 +509,6 @@ public class EventuallyConsistentMapImpl<K, V>
);
}
private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
// Note: we had this flipped before...
// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
}
private boolean underHighLoad() {
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
......@@ -556,10 +550,14 @@ public class EventuallyConsistentMapImpl<K, V>
}
AntiEntropyAdvertisement<K> ad = createAdvertisement();
if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
log.debug("Failed to send anti-entropy advertisement to {}", peer);
NodeId destination = peer;
clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send anti-entropy advertisement to {}", destination);
}
});
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
......@@ -595,9 +593,14 @@ public class EventuallyConsistentMapImpl<K, V>
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send reactive "
+ "anti-entropy advertisement to {}", sender);
}
});
break;
}
}
......@@ -801,12 +804,16 @@ public class EventuallyConsistentMapImpl<K, V>
)
);
communicationExecutor.submit(() -> {
try {
unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
} catch (Exception e) {
log.warn("broadcast error", e);
clusterCommunicator.unicast(Lists.newArrayList(map.values()),
updateMessageSubject,
serializer::encode,
peer)
.whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to send to {}", peer);
}
});
});
}
}
}
......
......@@ -407,21 +407,22 @@ public class NewDistributedFlowRuleStore
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
master, deviceId);
if (!clusterCommunicator.unicast(operation,
clusterCommunicator.unicast(operation,
APPLY_BATCH_FLOWS,
SERIALIZER::encode,
master)) {
master)
.whenComplete((result, error) -> {
log.warn("Failed to storeBatch: {} to {}", operation, master);
Set<FlowRule> allFailures = operation.getOperations().stream()
Set<FlowRule> allFailures = operation.getOperations()
.stream()
.map(op -> op.target())
.collect(Collectors.toSet());
notifyDelegate(FlowRuleBatchEvent.completed(
new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
new CompletedBatchOperation(false, allFailures, deviceId)));
return;
}
});
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
......
......@@ -395,8 +395,7 @@ public class DistributedGroupStore
}
// Check if group to be created by a remote instance
if (mastershipService.getLocalRole(
groupDesc.deviceId()) != MastershipRole.MASTER) {
if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
log.debug("storeGroupDescription: Device {} local role is not MASTER",
groupDesc.deviceId());
if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
......@@ -410,19 +409,22 @@ public class DistributedGroupStore
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
if (!clusterCommunicator.unicast(groupOp,
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
m -> kryoBuilder.build().serialize(m),
mastershipService.getMasterFor(groupDesc.deviceId()))) {
mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
groupOp,
mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
return;
}
log.debug("Sent Group operation request for device {} to remote MASTER {}",
} else {
log.debug("Sent Group operation request for device {} "
+ "to remote MASTER {}",
groupDesc.deviceId(),
mastershipService.getMasterFor(groupDesc.deviceId()));
}
});
return;
}
......@@ -512,15 +514,17 @@ public class DistributedGroupStore
newBuckets,
newAppCookie);
if (!clusterCommunicator.unicast(groupOp,
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
m -> kryoBuilder.build().serialize(m),
mastershipService.getMasterFor(deviceId))) {
mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
groupOp,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
mastershipService.getMasterFor(deviceId), error);
}
//TODO: Send Group operation failure event
});
return;
}
log.debug("updateGroupDescription for device {} is getting handled locally",
......@@ -643,15 +647,17 @@ public class DistributedGroupStore
createGroupDeleteRequestMsg(deviceId,
appCookie);
if (!clusterCommunicator.unicast(groupOp,
clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
m -> kryoBuilder.build().serialize(m),
mastershipService.getMasterFor(deviceId))) {
mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
if (error != null) {
log.warn("Failed to send request to master: {} to {}",
groupOp,
mastershipService.getMasterFor(deviceId));
//TODO: Send Group operation failure event
mastershipService.getMasterFor(deviceId), error);
}
//TODO: Send Group operation failure event
});
return;
}
log.debug("deleteGroupDescription in device {} is getting handled locally",
......
......@@ -18,7 +18,6 @@ package org.onosproject.store.ecmap;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
......@@ -145,7 +144,7 @@ public class EventuallyConsistentMapImplTest {
.register(KryoNamespaces.API)
.register(TestTimestamp.class);
ecMap = new EventuallyConsistentMapBuilderImpl<>(
ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
clusterService, clusterCommunicator)
.withName(MAP_NAME)
.withSerializer(serializer)
......@@ -702,7 +701,7 @@ public class EventuallyConsistentMapImplTest {
anyObject(MessageSubject.class),
anyObject(Function.class),
anyObject(NodeId.class)))
.andReturn(true)
.andReturn(CompletableFuture.completedFuture(null))
.anyTimes();
replay(clusterCommunicator);
}
......@@ -761,9 +760,9 @@ public class EventuallyConsistentMapImplTest {
}
@Override
public <M> boolean unicast(M message, MessageSubject subject,
public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
Function<M, byte[]> encoder, NodeId toNodeId) {
return false;
return null;
}
@Override
......@@ -795,33 +794,6 @@ public class EventuallyConsistentMapImplTest {
Function<byte[], M> decoder, Consumer<M> handler,
Executor executor) {
}
@Override
public boolean broadcast(ClusterMessage message) {
return false;
}
@Override
public boolean broadcastIncludeSelf(ClusterMessage message) {
return false;
}
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
return false;
}
@Override
public boolean multicast(ClusterMessage message,
Iterable<NodeId> nodeIds) {
return false;
}
@Override
public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
NodeId toNodeId) {
return null;
}
}
/**
......
......@@ -20,7 +20,6 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
......@@ -136,32 +135,39 @@ public class NettyMessaging implements MessagingService {
}
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
sendAsync(ep, message);
return sendAsync(ep, message);
}
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
CompletableFuture<Void> future = new CompletableFuture<>();
try {
if (ep.equals(localEp)) {
dispatchLocally(message);
return;
}
future.complete(null);
} else {
Channel channel = null;
try {
try {
channel = channels.borrowObject(ep);
channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.writeAndFlush(message).addListener(channelFuture -> {
if (!channelFuture.isSuccess()) {
future.completeExceptionally(channelFuture.cause());
} else {
future.complete(null);
}
});
} finally {
channels.returnObject(ep, channel);
}
} catch (IOException e) {
throw e;
}
} catch (Exception e) {
throw new IOException(e);
future.completeExceptionally(e);
}
return future;
}
@Override
......@@ -193,11 +199,11 @@ public class NettyMessaging implements MessagingService {
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
try {
sendAsync(message.sender(), response);
} catch (IOException e) {
log.debug("Failed to respond", e);
sendAsync(message.sender(), response).whenComplete((result, error) -> {
if (error != null) {
log.debug("Failed to respond", error);
}
});
}
}));
}
......@@ -211,11 +217,11 @@ public class NettyMessaging implements MessagingService {
localEp,
REPLY_MESSAGE_TYPE,
result);
try {
sendAsync(message.sender(), response);
} catch (IOException e) {
sendAsync(message.sender(), response).whenComplete((r, e) -> {
if (e != null) {
log.debug("Failed to respond", e);
}
});
}
});
});
......
......@@ -138,29 +138,30 @@ public class IOLoopMessaging implements MessagingService {
@Override
public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
DefaultMessage message = new DefaultMessage(
messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
sendAsync(ep, message);
return sendAsync(ep, message);
}
protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (ep.equals(localEp)) {
dispatchLocally(message);
return;
future.complete(null);
return future;
}
DefaultMessageStream stream = null;
try {
stream = streams.borrowObject(ep);
} catch (Exception e) {
throw new IOException(e);
}
try {
stream.write(message);
future.complete(null);
} catch (Exception e) {
future.completeExceptionally(e);
} finally {
try {
streams.returnObject(ep, stream);
......@@ -168,6 +169,7 @@ public class IOLoopMessaging implements MessagingService {
log.warn("Failed to return stream to pool");
}
}
return future;
}
@Override
......@@ -202,30 +204,30 @@ public class IOLoopMessaging implements MessagingService {
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
try {
sendAsync(message.sender(), response);
} catch (IOException e) {
log.debug("Failed to respond", e);
}
sendAsync(message.sender(), response).whenComplete((result, error) -> {
log.debug("Failed to respond", error);
});
}
}));
}
@Override
public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
handlers.put(type, message -> {
handler.apply(message.payload()).whenComplete((result, error) -> {
if (error == null) {
DefaultMessage response = new DefaultMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
result);
try {
sendAsync(message.sender(), response);
} catch (IOException e) {
sendAsync(message.sender(), response).whenComplete((r, e) -> {
if (e != null) {
log.debug("Failed to respond", e);
}
});
}
}));
});
});
}
@Override
......