Madan Jampani
Committed by Gerrit Code Review

Changed netty message type to String from Long to avoid potential collisions

Change-Id: I42014a920917a8022744ae15a9fefa6bae6890a7
......@@ -23,6 +23,7 @@ public enum DecoderState {
READ_SENDER_IP_VERSION,
READ_SENDER_IP,
READ_SENDER_PORT,
READ_MESSAGE_TYPE_LENGTH,
READ_MESSAGE_TYPE,
READ_CONTENT_LENGTH,
READ_CONTENT
......
......@@ -27,19 +27,18 @@ import com.google.common.base.MoreObjects;
*/
public final class InternalMessage implements Message {
public static final long REPLY_MESSAGE_TYPE =
NettyMessagingService.hashToLong("NETTY_MESSAGING_REQUEST_REPLY");
public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private long id;
private Endpoint sender;
private long type;
private String type;
private byte[] payload;
private transient NettyMessagingService messagingService;
// Must be created using the Builder.
private InternalMessage() {}
InternalMessage(long id, Endpoint sender, long type, byte[] payload) {
InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
......@@ -50,7 +49,7 @@ public final class InternalMessage implements Message {
return id;
}
public long type() {
public String type() {
return type;
}
......@@ -104,7 +103,7 @@ public final class InternalMessage implements Message {
return this;
}
public Builder withType(long type) {
public Builder withType(String type) {
message.type = type;
return this;
}
......
......@@ -27,6 +27,8 @@ import org.onlab.packet.IpAddress.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
/**
* Decoder for inbound messages.
*/
......@@ -40,8 +42,9 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private Version ipVersion;
private IpAddress senderIp;
private int senderPort;
private int messageTypeLength;
private String messageType;
private int contentLength;
private long messageType;
public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_MESSAGE_ID);
......@@ -68,9 +71,14 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkpoint(DecoderState.READ_SENDER_PORT);
case READ_SENDER_PORT:
senderPort = buffer.readInt();
checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
case READ_MESSAGE_TYPE_LENGTH:
messageTypeLength = buffer.readInt();
checkpoint(DecoderState.READ_MESSAGE_TYPE);
case READ_MESSAGE_TYPE:
messageType = buffer.readLong();
byte[] messageTypeBytes = new byte[messageTypeLength];
buffer.readBytes(messageTypeBytes);
messageType = new String(messageTypeBytes, Charsets.UTF_8);
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
......
......@@ -27,6 +27,8 @@ import org.onlab.packet.IpAddress.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
/**
* Encode InternalMessage out into a byte buffer.
*/
......@@ -57,8 +59,13 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write sender port
out.writeInt(sender.port());
// write message type.
out.writeLong(message.type());
byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
// write length of message type
out.writeInt(messageTypeBytes.length);
// write message type bytes
out.writeBytes(messageTypeBytes);
byte[] payload = message.payload();
......
......@@ -52,14 +52,10 @@ import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
......@@ -71,7 +67,7 @@ public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Endpoint localEp;
private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
......@@ -86,14 +82,6 @@ public class NettyMessagingService implements MessagingService {
})
.build();
private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
.build(new CacheLoader<String, Long>() {
@Override
public Long load(String type) {
return hashToLong(type);
}
});
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
......@@ -162,7 +150,7 @@ public class NettyMessagingService implements MessagingService {
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
.withType(messageTypeLookupCache.getUnchecked(type))
.withType(type)
.withPayload(payload)
.build();
sendAsync(ep, message);
......@@ -198,7 +186,7 @@ public class NettyMessagingService implements MessagingService {
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
.withType(messageTypeLookupCache.getUnchecked(type))
.withType(type)
.withPayload(payload)
.build();
try {
......@@ -212,12 +200,12 @@ public class NettyMessagingService implements MessagingService {
@Override
public void registerHandler(String type, MessageHandler handler) {
handlers.putIfAbsent(hashToLong(type), handler);
handlers.putIfAbsent(type, handler);
}
@Override
public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
handlers.putIfAbsent(type, new MessageHandler() {
@Override
public void handle(Message message) throws IOException {
executor.submit(() -> {
......@@ -233,10 +221,10 @@ public class NettyMessagingService implements MessagingService {
@Override
public void unregisterHandler(String type) {
handlers.remove(hashToLong(type));
handlers.remove(type);
}
private MessageHandler getMessageHandler(long type) {
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
......@@ -342,8 +330,8 @@ public class NettyMessagingService implements MessagingService {
}
private void dispatchLocally(InternalMessage message) throws IOException {
long type = message.type();
if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
String type = message.type();
if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
......@@ -366,13 +354,4 @@ public class NettyMessagingService implements MessagingService {
log.debug("No handler registered for {}", type);
}
}
/**
* Returns the md5 hash of the specified input string as a long.
* @param input input string.
* @return md5 hash as long.
*/
public static long hashToLong(String input) {
return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
}
}
\ No newline at end of file
......