alshabib

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

1 package org.onlab.onos.store.cluster.messaging; 1 package org.onlab.onos.store.cluster.messaging;
2 2
3 +import java.io.IOException;
4 +
3 import org.onlab.onos.cluster.NodeId; 5 import org.onlab.onos.cluster.NodeId;
4 6
5 // TODO: Should payload type be ByteBuffer? 7 // TODO: Should payload type be ByteBuffer?
...@@ -49,4 +51,14 @@ public class ClusterMessage { ...@@ -49,4 +51,14 @@ public class ClusterMessage {
49 public byte[] payload() { 51 public byte[] payload() {
50 return payload; 52 return payload;
51 } 53 }
54 +
55 + /**
56 + * Sends a response to the sender.
57 + *
58 + * @param data payload response.
59 + * @throws IOException
60 + */
61 + public void respond(byte[] data) throws IOException {
62 + throw new IllegalStateException("One can only repond to message recived from others.");
63 + }
52 } 64 }
......
...@@ -158,7 +158,7 @@ public class ClusterCommunicationManager ...@@ -158,7 +158,7 @@ public class ClusterCommunicationManager
158 public void handle(Message message) { 158 public void handle(Message message) {
159 try { 159 try {
160 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload()); 160 ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
161 - handler.handle(clusterMessage); 161 + handler.handle(new InternalClusterMessage(clusterMessage, message));
162 } catch (Exception e) { 162 } catch (Exception e) {
163 log.error("Exception caught during ClusterMessageHandler", e); 163 log.error("Exception caught during ClusterMessageHandler", e);
164 throw e; 164 throw e;
...@@ -166,6 +166,21 @@ public class ClusterCommunicationManager ...@@ -166,6 +166,21 @@ public class ClusterCommunicationManager
166 } 166 }
167 } 167 }
168 168
169 + public static final class InternalClusterMessage extends ClusterMessage {
170 +
171 + private final Message rawMessage;
172 +
173 + public InternalClusterMessage(ClusterMessage clusterMessage, Message rawMessage) {
174 + super(clusterMessage.sender(), clusterMessage.subject(), clusterMessage.payload());
175 + this.rawMessage = rawMessage;
176 + }
177 +
178 + @Override
179 + public void respond(byte[] response) throws IOException {
180 + rawMessage.respond(response);
181 + }
182 + }
183 +
169 private static final class InternalClusterMessageResponse implements ClusterMessageResponse { 184 private static final class InternalClusterMessageResponse implements ClusterMessageResponse {
170 185
171 private final NodeId sender; 186 private final NodeId sender;
......