Aaron Kruglikov
Committed by Gerrit Code Review

Adding fingerprints to avoid interference between clusters.

Change-Id: I5e5278916f8b9b900d7d403b6d08f1f66a866fb2
...@@ -48,7 +48,8 @@ public class NettyMessagingManager extends NettyMessaging { ...@@ -48,7 +48,8 @@ public class NettyMessagingManager extends NettyMessaging {
48 public void activate() throws Exception { 48 public void activate() throws Exception {
49 ControllerNode localNode = clusterMetadataService.getLocalNode(); 49 ControllerNode localNode = clusterMetadataService.getLocalNode();
50 getTlsParameters(); 50 getTlsParameters();
51 - super.start(new Endpoint(localNode.ip(), localNode.tcpPort())); 51 + super.start(clusterMetadataService.getClusterMetadata().getName().hashCode(),
52 + new Endpoint(localNode.ip(), localNode.tcpPort()));
52 log.info("Started"); 53 log.info("Started");
53 } 54 }
54 55
......
1 #!/usr/bin/env python 1 #!/usr/bin/env python
2 -''' 2 +"""
3 Generate the partitions json file from the $OC* environment variables 3 Generate the partitions json file from the $OC* environment variables
4 4
5 Usage: onos-gen-partitions [output file] 5 Usage: onos-gen-partitions [output file]
6 If output file is not provided, the json is written to stdout. 6 If output file is not provided, the json is written to stdout.
7 -''' 7 +"""
8 8
9 from os import environ 9 from os import environ
10 from collections import deque, OrderedDict 10 from collections import deque, OrderedDict
11 import re 11 import re
12 import json 12 import json
13 import sys 13 import sys
14 +import random
15 +import string
14 16
15 convert = lambda text: int(text) if text.isdigit() else text.lower() 17 convert = lambda text: int(text) if text.isdigit() else text.lower()
16 alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] 18 alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
...@@ -42,8 +44,11 @@ if __name__ == '__main__': ...@@ -42,8 +44,11 @@ if __name__ == '__main__':
42 vars = get_OC_vars() 44 vars = get_OC_vars()
43 nodes = get_nodes(vars) 45 nodes = get_nodes(vars)
44 partitions = generate_permutations([v.get('id') for v in nodes], 3) 46 partitions = generate_permutations([v.get('id') for v in nodes], 3)
47 + name = 0
48 + for node in nodes:
49 + name = name ^ node['ip']
45 data = { 50 data = {
46 - 'name': 'default', 51 + 'name': name,
47 'nodes': nodes, 52 'nodes': nodes,
48 'partitions': partitions 53 'partitions': partitions
49 } 54 }
......
...@@ -19,6 +19,7 @@ package org.onlab.netty; ...@@ -19,6 +19,7 @@ package org.onlab.netty;
19 * State transitions a decoder goes through as it is decoding an incoming message. 19 * State transitions a decoder goes through as it is decoding an incoming message.
20 */ 20 */
21 public enum DecoderState { 21 public enum DecoderState {
22 + READ_MESSAGE_PREAMBLE,
22 READ_MESSAGE_ID, 23 READ_MESSAGE_ID,
23 READ_SENDER_IP_VERSION, 24 READ_SENDER_IP_VERSION,
24 READ_SENDER_IP, 25 READ_SENDER_IP,
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onlab.netty; 16 package org.onlab.netty;
17 17
18 import static com.google.common.base.Preconditions.checkState; 18 import static com.google.common.base.Preconditions.checkState;
19 +
19 import io.netty.buffer.ByteBuf; 20 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext; 21 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.ReplayingDecoder; 22 import io.netty.handler.codec.ReplayingDecoder;
...@@ -37,7 +38,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -37,7 +38,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
37 38
38 private final Logger log = LoggerFactory.getLogger(getClass()); 39 private final Logger log = LoggerFactory.getLogger(getClass());
39 40
41 + private final int correctPreamble;
40 private long messageId; 42 private long messageId;
43 + private int preamble;
41 private Version ipVersion; 44 private Version ipVersion;
42 private IpAddress senderIp; 45 private IpAddress senderIp;
43 private int senderPort; 46 private int senderPort;
...@@ -45,8 +48,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -45,8 +48,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
45 private String messageType; 48 private String messageType;
46 private int contentLength; 49 private int contentLength;
47 50
48 - public MessageDecoder() { 51 + public MessageDecoder(int correctPreamble) {
49 - super(DecoderState.READ_MESSAGE_ID); 52 + super(DecoderState.READ_MESSAGE_PREAMBLE);
53 + this.correctPreamble = correctPreamble;
50 } 54 }
51 55
52 @Override 56 @Override
...@@ -56,6 +60,12 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -56,6 +60,12 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
56 List<Object> out) throws Exception { 60 List<Object> out) throws Exception {
57 61
58 switch (state()) { 62 switch (state()) {
63 + case READ_MESSAGE_PREAMBLE:
64 + preamble = buffer.readInt();
65 + if (preamble != correctPreamble) {
66 + throw new IllegalStateException("This message had an incorrect preamble.");
67 + }
68 + checkpoint(DecoderState.READ_MESSAGE_ID);
59 case READ_MESSAGE_ID: 69 case READ_MESSAGE_ID:
60 messageId = buffer.readLong(); 70 messageId = buffer.readLong();
61 checkpoint(DecoderState.READ_SENDER_IP_VERSION); 71 checkpoint(DecoderState.READ_SENDER_IP_VERSION);
...@@ -63,9 +73,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -63,9 +73,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
63 ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6; 73 ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
64 checkpoint(DecoderState.READ_SENDER_IP); 74 checkpoint(DecoderState.READ_SENDER_IP);
65 case READ_SENDER_IP: 75 case READ_SENDER_IP:
66 - byte[] octects = new byte[IpAddress.byteLength(ipVersion)]; 76 + byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
67 - buffer.readBytes(octects); 77 + buffer.readBytes(octets);
68 - senderIp = IpAddress.valueOf(ipVersion, octects); 78 + senderIp = IpAddress.valueOf(ipVersion, octets);
69 checkpoint(DecoderState.READ_SENDER_PORT); 79 checkpoint(DecoderState.READ_SENDER_PORT);
70 case READ_SENDER_PORT: 80 case READ_SENDER_PORT:
71 senderPort = buffer.readInt(); 81 senderPort = buffer.readInt();
...@@ -82,15 +92,15 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -82,15 +92,15 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
82 contentLength = buffer.readInt(); 92 contentLength = buffer.readInt();
83 checkpoint(DecoderState.READ_CONTENT); 93 checkpoint(DecoderState.READ_CONTENT);
84 case READ_CONTENT: 94 case READ_CONTENT:
95 + //TODO Perform a sanity check on the size before allocating
85 byte[] payload = new byte[contentLength]; 96 byte[] payload = new byte[contentLength];
86 buffer.readBytes(payload); 97 buffer.readBytes(payload);
87 - InternalMessage message = new InternalMessage( 98 + InternalMessage message = new InternalMessage(messageId,
88 - messageId,
89 new Endpoint(senderIp, senderPort), 99 new Endpoint(senderIp, senderPort),
90 messageType, 100 messageType,
91 payload); 101 payload);
92 out.add(message); 102 out.add(message);
93 - checkpoint(DecoderState.READ_MESSAGE_ID); 103 + checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
94 break; 104 break;
95 default: 105 default:
96 checkState(false, "Must not be here"); 106 checkState(false, "Must not be here");
......
...@@ -36,6 +36,13 @@ import com.google.common.base.Charsets; ...@@ -36,6 +36,13 @@ import com.google.common.base.Charsets;
36 @Sharable 36 @Sharable
37 public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { 37 public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
38 38
39 + private final int preamble;
40 +
41 + public MessageEncoder(int preamble) {
42 + super();
43 + this.preamble = preamble;
44 + }
45 +
39 private final Logger log = LoggerFactory.getLogger(getClass()); 46 private final Logger log = LoggerFactory.getLogger(getClass());
40 47
41 @Override 48 @Override
...@@ -44,6 +51,8 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { ...@@ -44,6 +51,8 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
44 InternalMessage message, 51 InternalMessage message,
45 ByteBuf out) throws Exception { 52 ByteBuf out) throws Exception {
46 53
54 + out.writeInt(this.preamble);
55 +
47 // write message id 56 // write message id
48 out.writeLong(message.id()); 57 out.writeLong(message.id());
49 58
......
...@@ -74,6 +74,7 @@ public class NettyMessaging implements MessagingService { ...@@ -74,6 +74,7 @@ public class NettyMessaging implements MessagingService {
74 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY"; 74 private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
75 75
76 private Endpoint localEp; 76 private Endpoint localEp;
77 + private int preamble;
77 private final AtomicBoolean started = new AtomicBoolean(false); 78 private final AtomicBoolean started = new AtomicBoolean(false);
78 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>(); 79 private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
79 private final AtomicLong messageIdGenerator = new AtomicLong(0); 80 private final AtomicLong messageIdGenerator = new AtomicLong(0);
...@@ -123,11 +124,12 @@ public class NettyMessaging implements MessagingService { ...@@ -123,11 +124,12 @@ public class NettyMessaging implements MessagingService {
123 clientChannelClass = NioSocketChannel.class; 124 clientChannelClass = NioSocketChannel.class;
124 } 125 }
125 126
126 - public void start(Endpoint localEp) throws Exception { 127 + public void start(int preamble, Endpoint localEp) throws Exception {
127 if (started.get()) { 128 if (started.get()) {
128 log.warn("Already running at local endpoint: {}", localEp); 129 log.warn("Already running at local endpoint: {}", localEp);
129 return; 130 return;
130 } 131 }
132 + this.preamble = preamble;
131 this.localEp = localEp; 133 this.localEp = localEp;
132 channels.setLifo(true); 134 channels.setLifo(true);
133 channels.setTestOnBorrow(true); 135 channels.setTestOnBorrow(true);
...@@ -324,7 +326,7 @@ public class NettyMessaging implements MessagingService { ...@@ -324,7 +326,7 @@ public class NettyMessaging implements MessagingService {
324 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { 326 private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
325 327
326 private final ChannelHandler dispatcher = new InboundMessageDispatcher(); 328 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
327 - private final ChannelHandler encoder = new MessageEncoder(); 329 + private final ChannelHandler encoder = new MessageEncoder(preamble);
328 330
329 @Override 331 @Override
330 protected void initChannel(SocketChannel channel) throws Exception { 332 protected void initChannel(SocketChannel channel) throws Exception {
...@@ -351,7 +353,7 @@ public class NettyMessaging implements MessagingService { ...@@ -351,7 +353,7 @@ public class NettyMessaging implements MessagingService {
351 353
352 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine)) 354 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
353 .addLast("encoder", encoder) 355 .addLast("encoder", encoder)
354 - .addLast("decoder", new MessageDecoder()) 356 + .addLast("decoder", new MessageDecoder(preamble))
355 .addLast("handler", dispatcher); 357 .addLast("handler", dispatcher);
356 } 358 }
357 359
...@@ -360,7 +362,7 @@ public class NettyMessaging implements MessagingService { ...@@ -360,7 +362,7 @@ public class NettyMessaging implements MessagingService {
360 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { 362 private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
361 363
362 private final ChannelHandler dispatcher = new InboundMessageDispatcher(); 364 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
363 - private final ChannelHandler encoder = new MessageEncoder(); 365 + private final ChannelHandler encoder = new MessageEncoder(preamble);
364 366
365 @Override 367 @Override
366 protected void initChannel(SocketChannel channel) throws Exception { 368 protected void initChannel(SocketChannel channel) throws Exception {
...@@ -386,7 +388,7 @@ public class NettyMessaging implements MessagingService { ...@@ -386,7 +388,7 @@ public class NettyMessaging implements MessagingService {
386 388
387 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine)) 389 channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
388 .addLast("encoder", encoder) 390 .addLast("encoder", encoder)
389 - .addLast("decoder", new MessageDecoder()) 391 + .addLast("decoder", new MessageDecoder(preamble))
390 .addLast("handler", dispatcher); 392 .addLast("handler", dispatcher);
391 } 393 }
392 394
...@@ -395,13 +397,13 @@ public class NettyMessaging implements MessagingService { ...@@ -395,13 +397,13 @@ public class NettyMessaging implements MessagingService {
395 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { 397 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
396 398
397 private final ChannelHandler dispatcher = new InboundMessageDispatcher(); 399 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
398 - private final ChannelHandler encoder = new MessageEncoder(); 400 + private final ChannelHandler encoder = new MessageEncoder(preamble);
399 401
400 @Override 402 @Override
401 protected void initChannel(SocketChannel channel) throws Exception { 403 protected void initChannel(SocketChannel channel) throws Exception {
402 channel.pipeline() 404 channel.pipeline()
403 .addLast("encoder", encoder) 405 .addLast("encoder", encoder)
404 - .addLast("decoder", new MessageDecoder()) 406 + .addLast("decoder", new MessageDecoder(preamble))
405 .addLast("handler", dispatcher); 407 .addLast("handler", dispatcher);
406 } 408 }
407 } 409 }
...@@ -424,7 +426,6 @@ public class NettyMessaging implements MessagingService { ...@@ -424,7 +426,6 @@ public class NettyMessaging implements MessagingService {
424 context.close(); 426 context.close();
425 } 427 }
426 } 428 }
427 -
428 private void dispatchLocally(InternalMessage message) throws IOException { 429 private void dispatchLocally(InternalMessage message) throws IOException {
429 String type = message.type(); 430 String type = message.type();
430 if (REPLY_MESSAGE_TYPE.equals(type)) { 431 if (REPLY_MESSAGE_TYPE.equals(type)) {
......