Yuta HIGUCHI

renamed Serializer -> (onlab.netty-layer) PayloadSerializer

- Added TODO memos to ClusterCommunicationService layer

Change-Id: I4c81a72d03cddd23637f9c6cbf102125ea448c01
......@@ -12,6 +12,7 @@ public class ClusterMessage {
private final NodeId sender;
private final MessageSubject subject;
private final Object payload;
// TODO: add field specifying Serializer for payload
/**
* Creates a cluster message.
......
......@@ -12,8 +12,6 @@ import java.util.TimerTask;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
......@@ -46,16 +44,23 @@ public class ClusterCommunicationManager
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
// TODO: This probably should not be a OSGi service.
//@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
@Activate
public void activate() {
// TODO: initialize messagingService
// TODO: setPayloadSerializer, which is capable of
// (1) serialize ClusterMessage - ClusterMessage.payload
// (2) serialize ClusterMessage.payload using user specified serializer
// messagingService.setPayloadSerializer(...);
log.info("Started");
}
@Deactivate
public void deactivate() {
// TODO: cleanup messageingService if needed.
log.info("Stopped");
}
......
package org.onlab.onos.store.serializers;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
public ClusterMessageSerializer() {
// does not accept null
super(false);
}
@Override
public void write(Kryo kryo, Output output, ClusterMessage object) {
kryo.writeClassAndObject(output, object.sender());
kryo.writeClassAndObject(output, object.subject());
// TODO: write bytes serialized using ClusterMessage specified serializer
// write serialized payload size
//output.writeInt(...);
// write serialized payload
//output.writeBytes(...);
}
@Override
public ClusterMessage read(Kryo kryo, Input input,
Class<ClusterMessage> type) {
// TODO Auto-generated method stub
NodeId sender = (NodeId) kryo.readClassAndObject(input);
MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
int size = input.readInt();
byte[] payloadBytes = input.readBytes(size);
// TODO: deserialize payload using ClusterMessage specified serializer
Object payload = null;
return new ClusterMessage(sender, subject, payload);
}
}
......@@ -9,7 +9,7 @@ import java.util.HashMap;
/**
* Kryo Serializer.
*/
public class KryoSerializer implements Serializer {
public class KryoSerializer implements PayloadSerializer {
private KryoPool serializerPool;
......
......@@ -14,14 +14,14 @@ import java.util.List;
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
private final PayloadSerializer payloadSerializer;
private int contentLength;
public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) {
super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
this.serializer = serializer;
this.payloadSerializer = payloadSerializer;
}
@Override
......@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
......
......@@ -17,10 +17,10 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
public static final int SERIALIZER_VERSION = 1;
private final Serializer serializer;
private final PayloadSerializer payloadSerializer;
public MessageEncoder(Serializer serializer) {
this.serializer = serializer;
public MessageEncoder(PayloadSerializer payloadSerializer) {
this.payloadSerializer = payloadSerializer;
}
@Override
......@@ -35,12 +35,12 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
// write preamble
out.writeBytes(PREAMBLE);
byte[] payload = serializer.encode(message);
byte[] payload = payloadSerializer.encode(message);
// write payload length
out.writeInt(payload.length);
// write serializer version
// write payloadSerializer version
out.writeInt(SERIALIZER_VERSION);
// write payload.
......
......@@ -38,4 +38,11 @@ public interface MessagingService {
* @param type message type
*/
public void unregisterHandler(String type);
/**
* Specify the serializer to use for encoding/decoding payload.
*
* @param payloadSerializer payloadSerializer to use
*/
public void setPayloadSerializer(PayloadSerializer payloadSerializer);
}
......
......@@ -52,7 +52,7 @@ public class NettyMessagingService implements MessagingService {
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
protected Serializer serializer;
protected PayloadSerializer payloadSerializer;
public NettyMessagingService() {
// TODO: Default port should be configurable.
......@@ -133,8 +133,9 @@ public class NettyMessagingService implements MessagingService {
handlers.remove(type);
}
public void setSerializer(Serializer serializer) {
this.serializer = serializer;
@Override
public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
this.payloadSerializer = payloadSerializer;
}
private MessageHandler getMessageHandler(String type) {
......@@ -201,13 +202,13 @@ public class NettyMessagingService implements MessagingService {
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
private final ChannelHandler encoder = new MessageEncoder(serializer);
private final ChannelHandler encoder = new MessageEncoder(payloadSerializer);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer))
.addLast("handler", dispatcher);
}
}
......
......@@ -5,7 +5,7 @@ import java.nio.ByteBuffer;
/**
* Interface for encoding/decoding message payloads.
*/
public interface Serializer {
public interface PayloadSerializer {
/**
* Decodes the specified byte array to a POJO.
......
......@@ -44,8 +44,8 @@ public final class SimpleClient {
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
Serializer serializer = new KryoSerializer();
this.serializer = serializer;
PayloadSerializer payloadSerializer = new KryoSerializer();
this.payloadSerializer = payloadSerializer;
}
}
}
......
......@@ -6,7 +6,7 @@ public final class SimpleServer {
public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
server.setSerializer(new KryoSerializer());
server.setPayloadSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
......