Madan Jampani

Added sendAndReceive API to ClusterCommunicationService

...@@ -37,6 +37,15 @@ public interface ClusterCommunicationService { ...@@ -37,6 +37,15 @@ public interface ClusterCommunicationService {
37 boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException; 37 boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException;
38 38
39 /** 39 /**
40 + * Sends a message synchronously.
41 + * @param message message to send
42 + * @param toNodeId recipient node identifier
43 + * @return ClusterMessageResponse which is reply future.
44 + * @throws IOException
45 + */
46 + ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException;
47 +
48 + /**
40 * Adds a new subscriber for the specified message subject. 49 * Adds a new subscriber for the specified message subject.
41 * 50 *
42 * @param subject message subject 51 * @param subject message subject
......
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import java.util.concurrent.TimeUnit;
4 +import java.util.concurrent.TimeoutException;
5 +
6 +import org.onlab.onos.cluster.NodeId;
7 +
8 +public interface ClusterMessageResponse {
9 + public NodeId sender();
10 + public byte[] get(long timeout, TimeUnit timeunit) throws TimeoutException;
11 + public byte[] get(long timeout) throws InterruptedException;
12 +}
...@@ -4,6 +4,9 @@ import static com.google.common.base.Preconditions.checkArgument; ...@@ -4,6 +4,9 @@ import static com.google.common.base.Preconditions.checkArgument;
4 4
5 import java.io.IOException; 5 import java.io.IOException;
6 import java.util.Set; 6 import java.util.Set;
7 +import java.util.concurrent.TimeUnit;
8 +import java.util.concurrent.TimeoutException;
9 +
7 import org.apache.felix.scr.annotations.Activate; 10 import org.apache.felix.scr.annotations.Activate;
8 import org.apache.felix.scr.annotations.Component; 11 import org.apache.felix.scr.annotations.Component;
9 import org.apache.felix.scr.annotations.Deactivate; 12 import org.apache.felix.scr.annotations.Deactivate;
...@@ -17,6 +20,7 @@ import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent; ...@@ -17,6 +20,7 @@ import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
17 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 20 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
18 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 21 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
19 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 22 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
23 +import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
20 import org.onlab.onos.store.cluster.messaging.MessageSubject; 24 import org.onlab.onos.store.cluster.messaging.MessageSubject;
21 import org.onlab.onos.store.serializers.ClusterMessageSerializer; 25 import org.onlab.onos.store.serializers.ClusterMessageSerializer;
22 import org.onlab.onos.store.serializers.KryoPoolUtil; 26 import org.onlab.onos.store.serializers.KryoPoolUtil;
...@@ -28,6 +32,7 @@ import org.onlab.netty.Message; ...@@ -28,6 +32,7 @@ import org.onlab.netty.Message;
28 import org.onlab.netty.MessageHandler; 32 import org.onlab.netty.MessageHandler;
29 import org.onlab.netty.MessagingService; 33 import org.onlab.netty.MessagingService;
30 import org.onlab.netty.NettyMessagingService; 34 import org.onlab.netty.NettyMessagingService;
35 +import org.onlab.netty.Response;
31 import org.slf4j.Logger; 36 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory; 37 import org.slf4j.LoggerFactory;
33 38
...@@ -120,6 +125,22 @@ public class ClusterCommunicationManager ...@@ -120,6 +125,22 @@ public class ClusterCommunicationManager
120 } 125 }
121 126
122 @Override 127 @Override
128 + public ClusterMessageResponse sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
129 + ControllerNode node = clusterService.getNode(toNodeId);
130 + checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
131 + Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
132 + try {
133 + Response responseFuture =
134 + messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
135 + return new InternalClusterMessageResponse(toNodeId, responseFuture);
136 +
137 + } catch (IOException e) {
138 + log.error("Failed interaction with remote nodeId: " + toNodeId, e);
139 + throw e;
140 + }
141 + }
142 +
143 + @Override
123 public void addSubscriber(MessageSubject subject, 144 public void addSubscriber(MessageSubject subject,
124 ClusterMessageHandler subscriber) { 145 ClusterMessageHandler subscriber) {
125 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber)); 146 messagingService.registerHandler(subject.value(), new InternalClusterMessageHandler(subscriber));
...@@ -144,4 +165,30 @@ public class ClusterCommunicationManager ...@@ -144,4 +165,30 @@ public class ClusterCommunicationManager
144 } 165 }
145 } 166 }
146 } 167 }
168 +
169 + private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
170 +
171 + private final NodeId sender;
172 + private final Response responseFuture;
173 +
174 + public InternalClusterMessageResponse(NodeId sender, Response responseFuture) {
175 + this.sender = sender;
176 + this.responseFuture = responseFuture;
177 + }
178 + @Override
179 + public NodeId sender() {
180 + return sender;
181 + }
182 +
183 + @Override
184 + public byte[] get(long timeout, TimeUnit timeunit)
185 + throws TimeoutException {
186 + return responseFuture.get(timeout, timeunit);
187 + }
188 +
189 + @Override
190 + public byte[] get(long timeout) throws InterruptedException {
191 + return responseFuture.get();
192 + }
193 + }
147 } 194 }
......