Madan Jampani
Committed by Gerrit Code Review

Reply with error status on receiving message with incorrect preamble

Change-Id: I0d17dc74c817546f221fbcade1d5642c8f29b0fe
...@@ -49,4 +49,10 @@ public class MessagingException extends IOException { ...@@ -49,4 +49,10 @@ public class MessagingException extends IOException {
49 */ 49 */
50 public static class RemoteHandlerFailure extends MessagingException { 50 public static class RemoteHandlerFailure extends MessagingException {
51 } 51 }
52 +
53 + /**
54 + * Exception indicating failure due to invalid message strucuture such as an incorrect preamble.
55 + */
56 + public static class ProcotolException extends MessagingException {
57 + }
52 } 58 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -42,24 +42,31 @@ public final class InternalMessage { ...@@ -42,24 +42,31 @@ public final class InternalMessage {
42 /** 42 /**
43 * Response status signifying an exception handling the message. 43 * Response status signifying an exception handling the message.
44 */ 44 */
45 - ERROR_HANDLER_EXCEPTION 45 + ERROR_HANDLER_EXCEPTION,
46 +
47 + /**
48 + * Reponse status signifying invalid message structure.
49 + */
50 + PROTOCOL_EXCEPTION
46 51
47 // NOTE: For backwards compatibility it important that new enum constants 52 // NOTE: For backwards compatibility it important that new enum constants
48 // be appended. 53 // be appended.
49 // FIXME: We should remove this restriction in the future. 54 // FIXME: We should remove this restriction in the future.
50 } 55 }
51 56
57 + private final int preamble;
52 private final long id; 58 private final long id;
53 private final Endpoint sender; 59 private final Endpoint sender;
54 private final String type; 60 private final String type;
55 private final byte[] payload; 61 private final byte[] payload;
56 private final Status status; 62 private final Status status;
57 63
58 - public InternalMessage(long id, Endpoint sender, String type, byte[] payload) { 64 + public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload) {
59 - this(id, sender, type, payload, Status.OK); 65 + this(preamble, id, sender, type, payload, Status.OK);
60 } 66 }
61 67
62 - public InternalMessage(long id, Endpoint sender, String type, byte[] payload, Status status) { 68 + public InternalMessage(int preamble, long id, Endpoint sender, String type, byte[] payload, Status status) {
69 + this.preamble = preamble;
63 this.id = id; 70 this.id = id;
64 this.sender = sender; 71 this.sender = sender;
65 this.type = type; 72 this.type = type;
...@@ -67,6 +74,10 @@ public final class InternalMessage { ...@@ -67,6 +74,10 @@ public final class InternalMessage {
67 this.status = status; 74 this.status = status;
68 } 75 }
69 76
77 + public int preamble() {
78 + return preamble;
79 + }
80 +
70 public long id() { 81 public long id() {
71 return id; 82 return id;
72 } 83 }
......
...@@ -39,7 +39,6 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -39,7 +39,6 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
39 39
40 private final Logger log = LoggerFactory.getLogger(getClass()); 40 private final Logger log = LoggerFactory.getLogger(getClass());
41 41
42 - private final int correctPreamble;
43 private long messageId; 42 private long messageId;
44 private int preamble; 43 private int preamble;
45 private Version ipVersion; 44 private Version ipVersion;
...@@ -50,9 +49,8 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -50,9 +49,8 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
50 private Status status; 49 private Status status;
51 private int contentLength; 50 private int contentLength;
52 51
53 - public MessageDecoder(int correctPreamble) { 52 + public MessageDecoder() {
54 super(DecoderState.READ_MESSAGE_PREAMBLE); 53 super(DecoderState.READ_MESSAGE_PREAMBLE);
55 - this.correctPreamble = correctPreamble;
56 } 54 }
57 55
58 @Override 56 @Override
...@@ -65,9 +63,6 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -65,9 +63,6 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
65 switch (state()) { 63 switch (state()) {
66 case READ_MESSAGE_PREAMBLE: 64 case READ_MESSAGE_PREAMBLE:
67 preamble = buffer.readInt(); 65 preamble = buffer.readInt();
68 - if (preamble != correctPreamble) {
69 - throw new IllegalStateException("This message had an incorrect preamble.");
70 - }
71 checkpoint(DecoderState.READ_MESSAGE_ID); 66 checkpoint(DecoderState.READ_MESSAGE_ID);
72 case READ_MESSAGE_ID: 67 case READ_MESSAGE_ID:
73 messageId = buffer.readLong(); 68 messageId = buffer.readLong();
...@@ -106,7 +101,8 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -106,7 +101,8 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
106 } else { 101 } else {
107 payload = new byte[0]; 102 payload = new byte[0];
108 } 103 }
109 - InternalMessage message = new InternalMessage(messageId, 104 + InternalMessage message = new InternalMessage(preamble,
105 + messageId,
110 new Endpoint(senderIp, senderPort), 106 new Endpoint(senderIp, senderPort),
111 messageType, 107 messageType,
112 payload, 108 payload,
......
...@@ -217,7 +217,8 @@ public class NettyMessagingManager implements MessagingService { ...@@ -217,7 +217,8 @@ public class NettyMessagingManager implements MessagingService {
217 @Override 217 @Override
218 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) { 218 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
219 checkPermission(CLUSTER_WRITE); 219 checkPermission(CLUSTER_WRITE);
220 - InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(), 220 + InternalMessage message = new InternalMessage(preamble,
221 + messageIdGenerator.incrementAndGet(),
221 localEp, 222 localEp,
222 type, 223 type,
223 payload); 224 payload);
...@@ -263,7 +264,7 @@ public class NettyMessagingManager implements MessagingService { ...@@ -263,7 +264,7 @@ public class NettyMessagingManager implements MessagingService {
263 Callback callback = new Callback(response, executor); 264 Callback callback = new Callback(response, executor);
264 Long messageId = messageIdGenerator.incrementAndGet(); 265 Long messageId = messageIdGenerator.incrementAndGet();
265 callbacks.put(messageId, callback); 266 callbacks.put(messageId, callback);
266 - InternalMessage message = new InternalMessage(messageId, localEp, type, payload); 267 + InternalMessage message = new InternalMessage(preamble, messageId, localEp, type, payload);
267 return sendAsync(ep, message).whenComplete((r, e) -> { 268 return sendAsync(ep, message).whenComplete((r, e) -> {
268 if (e != null) { 269 if (e != null) {
269 callbacks.invalidate(messageId); 270 callbacks.invalidate(messageId);
...@@ -425,7 +426,7 @@ public class NettyMessagingManager implements MessagingService { ...@@ -425,7 +426,7 @@ public class NettyMessagingManager implements MessagingService {
425 426
426 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine)) 427 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
427 .addLast("encoder", encoder) 428 .addLast("encoder", encoder)
428 - .addLast("decoder", new MessageDecoder(preamble)) 429 + .addLast("decoder", new MessageDecoder())
429 .addLast("handler", dispatcher); 430 .addLast("handler", dispatcher);
430 } 431 }
431 } 432 }
...@@ -459,7 +460,7 @@ public class NettyMessagingManager implements MessagingService { ...@@ -459,7 +460,7 @@ public class NettyMessagingManager implements MessagingService {
459 460
460 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine)) 461 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
461 .addLast("encoder", encoder) 462 .addLast("encoder", encoder)
462 - .addLast("decoder", new MessageDecoder(preamble)) 463 + .addLast("decoder", new MessageDecoder())
463 .addLast("handler", dispatcher); 464 .addLast("handler", dispatcher);
464 } 465 }
465 } 466 }
...@@ -473,7 +474,7 @@ public class NettyMessagingManager implements MessagingService { ...@@ -473,7 +474,7 @@ public class NettyMessagingManager implements MessagingService {
473 protected void initChannel(SocketChannel channel) throws Exception { 474 protected void initChannel(SocketChannel channel) throws Exception {
474 channel.pipeline() 475 channel.pipeline()
475 .addLast("encoder", encoder) 476 .addLast("encoder", encoder)
476 - .addLast("decoder", new MessageDecoder(preamble)) 477 + .addLast("decoder", new MessageDecoder())
477 .addLast("handler", dispatcher); 478 .addLast("handler", dispatcher);
478 } 479 }
479 } 480 }
...@@ -497,6 +498,10 @@ public class NettyMessagingManager implements MessagingService { ...@@ -497,6 +498,10 @@ public class NettyMessagingManager implements MessagingService {
497 } 498 }
498 } 499 }
499 private void dispatchLocally(InternalMessage message) throws IOException { 500 private void dispatchLocally(InternalMessage message) throws IOException {
501 + if (message.preamble() != preamble) {
502 + log.debug("Received {} with invalid preamble from {}", message.type(), message.sender());
503 + sendReply(message, Status.PROTOCOL_EXCEPTION, Optional.empty());
504 + }
500 String type = message.type(); 505 String type = message.type();
501 if (REPLY_MESSAGE_TYPE.equals(type)) { 506 if (REPLY_MESSAGE_TYPE.equals(type)) {
502 try { 507 try {
...@@ -509,6 +514,8 @@ public class NettyMessagingManager implements MessagingService { ...@@ -509,6 +514,8 @@ public class NettyMessagingManager implements MessagingService {
509 callback.completeExceptionally(new MessagingException.NoRemoteHandler()); 514 callback.completeExceptionally(new MessagingException.NoRemoteHandler());
510 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) { 515 } else if (message.status() == Status.ERROR_HANDLER_EXCEPTION) {
511 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure()); 516 callback.completeExceptionally(new MessagingException.RemoteHandlerFailure());
517 + } else if (message.status() == Status.PROTOCOL_EXCEPTION) {
518 + callback.completeExceptionally(new MessagingException.ProcotolException());
512 } 519 }
513 } else { 520 } else {
514 log.debug("Received a reply for message id:[{}]. " 521 log.debug("Received a reply for message id:[{}]. "
...@@ -530,7 +537,8 @@ public class NettyMessagingManager implements MessagingService { ...@@ -530,7 +537,8 @@ public class NettyMessagingManager implements MessagingService {
530 } 537 }
531 538
532 private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) { 539 private void sendReply(InternalMessage message, Status status, Optional<byte[]> responsePayload) {
533 - InternalMessage response = new InternalMessage(message.id(), 540 + InternalMessage response = new InternalMessage(preamble,
541 + message.id(),
534 localEp, 542 localEp,
535 REPLY_MESSAGE_TYPE, 543 REPLY_MESSAGE_TYPE,
536 responsePayload.orElse(new byte[0]), 544 responsePayload.orElse(new byte[0]),
......