Madan Jampani

Introduced a serializer interface and minimal kyro based implementation as a pre…

…cursor to moving netty messaging out to onos-utils
package org.onlab.onos.store.messaging.impl;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Kryo Serializer.
*/
public class KryoSerializer implements Serializer {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
public KryoSerializer() {
setupKryoPool();
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ArrayList.class
)
.build()
.populate(1);
}
@Override
public Object decode(byte[] data) {
return serializerPool.deserialize(data);
}
@Override
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
}
......@@ -5,7 +5,6 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkState;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.messaging.Endpoint;
import io.netty.buffer.ByteBuf;
......@@ -18,11 +17,11 @@ import io.netty.handler.codec.ByteToMessageDecoder;
public class MessageDecoder extends ByteToMessageDecoder {
private final NettyMessagingService messagingService;
private final SerializationService serializationService;
private final Serializer serializer;
public MessageDecoder(NettyMessagingService messagingService, SerializationService serializationService) {
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
this.messagingService = messagingService;
this.serializationService = serializationService;
this.serializer = serializer;
}
@Override
......@@ -47,7 +46,7 @@ public class MessageDecoder extends ByteToMessageDecoder {
Endpoint sender = new Endpoint(host, port);
// read message payload; first read size and then bytes.
Object payload = serializationService.decode(in.readBytes(in.readInt()).array());
Object payload = serializer.decode(in.readBytes(in.readInt()).array());
InternalMessage message = new InternalMessage.Builder(messagingService)
.withId(id)
......
package org.onlab.onos.store.messaging.impl;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
......@@ -14,10 +12,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// onosiscool in ascii
public static final byte[] PREAMBLE = "onosiscool".getBytes();
private final SerializationService serializationService;
private final Serializer serializer;
public MessageEncoder(SerializationService serializationService) {
this.serializationService = serializationService;
public MessageEncoder(Serializer serializer) {
this.serializer = serializer;
}
@Override
......@@ -46,12 +44,12 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
out.writeInt(message.sender().port());
try {
serializationService.encode(message.payload());
serializer.encode(message.payload());
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = serializationService.encode(message.payload());
byte[] payload = serializer.encode(message.payload());
// write payload length.
out.writeInt(payload.length);
......
......@@ -31,7 +31,6 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.MessageHandler;
import org.onlab.onos.store.messaging.MessagingService;
......@@ -61,7 +60,7 @@ public class NettyMessagingService implements MessagingService {
private final Endpoint localEp;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SerializationService serializationService;
protected Serializer serializer;
public NettyMessagingService() {
// TODO: Default port should be configurable.
......@@ -213,8 +212,8 @@ public class NettyMessagingService implements MessagingService {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new MessageEncoder(serializationService))
.addLast(new MessageDecoder(NettyMessagingService.this, serializationService))
.addLast(new MessageEncoder(serializer))
.addLast(new MessageDecoder(NettyMessagingService.this, serializer))
.addLast(new NettyMessagingService.InboundMessageDispatcher());
}
}
......
package org.onlab.onos.store.messaging.impl;
/**
* Interface for encoding/decoding message payloads.
*/
public interface Serializer {
/**
* Decodes the specified byte array to a POJO.
*
* @param data byte array.
* @return POJO
*/
Object decode(byte[] data);
/**
* Encodes the specified POJO into a byte array.
*
* @param data POJO to be encoded
* @return byte array.
*/
byte[] encode(Object message);
}
......@@ -2,7 +2,6 @@ package org.onlab.onos.store.messaging.impl;
import java.util.concurrent.TimeUnit;
import org.onlab.onos.store.cluster.impl.MessageSerializer;
import org.onlab.onos.store.messaging.Endpoint;
import org.onlab.onos.store.messaging.Response;
......@@ -21,9 +20,8 @@ public final class SimpleClient {
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
MessageSerializer mgr = new MessageSerializer();
mgr.activate();
this.serializationService = mgr;
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
}
}
}
......
package org.onlab.onos.store.messaging.impl;
import org.onlab.onos.store.cluster.impl.MessageSerializer;
public final class SimpleServer {
private SimpleServer() {}
......@@ -14,9 +12,8 @@ public final class SimpleServer {
public static class TestNettyMessagingService extends NettyMessagingService {
protected TestNettyMessagingService() {
MessageSerializer mgr = new MessageSerializer();
mgr.activate();
this.serializationService = mgr;
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
}
}
}
......