Madan Jampani

Simplified how message payloads get serialized/deserialized

Showing 21 changed files with 183 additions and 151 deletions
...@@ -11,7 +11,7 @@ public class ClusterMessage { ...@@ -11,7 +11,7 @@ public class ClusterMessage {
11 11
12 private final NodeId sender; 12 private final NodeId sender;
13 private final MessageSubject subject; 13 private final MessageSubject subject;
14 - private final Object payload; 14 + private final byte[] payload;
15 // TODO: add field specifying Serializer for payload 15 // TODO: add field specifying Serializer for payload
16 16
17 /** 17 /**
...@@ -19,7 +19,7 @@ public class ClusterMessage { ...@@ -19,7 +19,7 @@ public class ClusterMessage {
19 * 19 *
20 * @param subject message subject 20 * @param subject message subject
21 */ 21 */
22 - public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) { 22 + public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
23 this.sender = sender; 23 this.sender = sender;
24 this.subject = subject; 24 this.subject = subject;
25 this.payload = payload; 25 this.payload = payload;
...@@ -48,7 +48,7 @@ public class ClusterMessage { ...@@ -48,7 +48,7 @@ public class ClusterMessage {
48 * 48 *
49 * @return message payload. 49 * @return message payload.
50 */ 50 */
51 - public Object payload() { 51 + public byte[] payload() {
52 return payload; 52 return payload;
53 } 53 }
54 } 54 }
......
...@@ -23,6 +23,9 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; ...@@ -23,6 +23,9 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
23 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 23 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
24 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 24 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
25 import org.onlab.onos.store.cluster.messaging.MessageSubject; 25 import org.onlab.onos.store.cluster.messaging.MessageSubject;
26 +import org.onlab.onos.store.serializers.KryoPoolUtil;
27 +import org.onlab.onos.store.serializers.KryoSerializer;
28 +import org.onlab.util.KryoPool;
26 import org.onlab.netty.Endpoint; 29 import org.onlab.netty.Endpoint;
27 import org.onlab.netty.Message; 30 import org.onlab.netty.Message;
28 import org.onlab.netty.MessageHandler; 31 import org.onlab.netty.MessageHandler;
...@@ -48,6 +51,18 @@ public class ClusterCommunicationManager ...@@ -48,6 +51,18 @@ public class ClusterCommunicationManager
48 //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 51 //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
49 private MessagingService messagingService; 52 private MessagingService messagingService;
50 53
54 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
55 + protected void setupKryoPool() {
56 + serializerPool = KryoPool.newBuilder()
57 + .register(KryoPoolUtil.API)
58 + .register(ClusterMessage.class)
59 + .register(ClusterMembershipEvent.class)
60 + .build()
61 + .populate(1);
62 + }
63 +
64 + };
65 +
51 @Activate 66 @Activate
52 public void activate() { 67 public void activate() {
53 // TODO: initialize messagingService 68 // TODO: initialize messagingService
...@@ -92,7 +107,7 @@ public class ClusterCommunicationManager ...@@ -92,7 +107,7 @@ public class ClusterCommunicationManager
92 checkArgument(node != null, "Unknown nodeId: %s", toNodeId); 107 checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
93 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort()); 108 Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
94 try { 109 try {
95 - messagingService.sendAsync(nodeEp, message.subject().value(), message); 110 + messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
96 return true; 111 return true;
97 } catch (IOException e) { 112 } catch (IOException e) {
98 log.error("Failed to send cluster message to nodeId: " + toNodeId, e); 113 log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
...@@ -126,7 +141,7 @@ public class ClusterCommunicationManager ...@@ -126,7 +141,7 @@ public class ClusterCommunicationManager
126 broadcast(new ClusterMessage( 141 broadcast(new ClusterMessage(
127 localNode.id(), 142 localNode.id(),
128 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), 143 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
129 - new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))); 144 + SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
130 members.remove(node.id()); 145 members.remove(node.id());
131 } 146 }
132 147
...@@ -138,7 +153,7 @@ public class ClusterCommunicationManager ...@@ -138,7 +153,7 @@ public class ClusterCommunicationManager
138 broadcast(new ClusterMessage( 153 broadcast(new ClusterMessage(
139 localNode.id(), 154 localNode.id(),
140 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"), 155 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
141 - new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))); 156 + SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
142 } 157 }
143 } 158 }
144 159
...@@ -147,7 +162,7 @@ public class ClusterCommunicationManager ...@@ -147,7 +162,7 @@ public class ClusterCommunicationManager
147 @Override 162 @Override
148 public void handle(ClusterMessage message) { 163 public void handle(ClusterMessage message) {
149 164
150 - ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload(); 165 + ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
151 ControllerNode node = event.node(); 166 ControllerNode node = event.node();
152 if (event.type() == ClusterMembershipEventType.HEART_BEAT) { 167 if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
153 log.info("Node {} sent a hearbeat", node.id()); 168 log.info("Node {} sent a hearbeat", node.id());
...@@ -172,7 +187,8 @@ public class ClusterCommunicationManager ...@@ -172,7 +187,8 @@ public class ClusterCommunicationManager
172 187
173 @Override 188 @Override
174 public void handle(Message message) { 189 public void handle(Message message) {
175 - handler.handle((ClusterMessage) message.payload()); 190 + ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
191 + handler.handle(clusterMessage);
176 } 192 }
177 } 193 }
178 } 194 }
......
...@@ -13,6 +13,7 @@ import org.apache.felix.scr.annotations.Deactivate; ...@@ -13,6 +13,7 @@ import org.apache.felix.scr.annotations.Deactivate;
13 import org.apache.felix.scr.annotations.Reference; 13 import org.apache.felix.scr.annotations.Reference;
14 import org.apache.felix.scr.annotations.ReferenceCardinality; 14 import org.apache.felix.scr.annotations.ReferenceCardinality;
15 import org.apache.felix.scr.annotations.Service; 15 import org.apache.felix.scr.annotations.Service;
16 +import org.onlab.onos.cluster.ClusterService;
16 import org.onlab.onos.net.AnnotationsUtil; 17 import org.onlab.onos.net.AnnotationsUtil;
17 import org.onlab.onos.net.DefaultAnnotations; 18 import org.onlab.onos.net.DefaultAnnotations;
18 import org.onlab.onos.net.DefaultDevice; 19 import org.onlab.onos.net.DefaultDevice;
...@@ -37,7 +38,11 @@ import org.onlab.onos.store.Timestamp; ...@@ -37,7 +38,11 @@ import org.onlab.onos.store.Timestamp;
37 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 38 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
38 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 39 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
39 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 40 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
41 +import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
40 import org.onlab.onos.store.common.impl.Timestamped; 42 import org.onlab.onos.store.common.impl.Timestamped;
43 +import org.onlab.onos.store.serializers.KryoPoolUtil;
44 +import org.onlab.onos.store.serializers.KryoSerializer;
45 +import org.onlab.util.KryoPool;
41 import org.onlab.util.NewConcurrentHashMap; 46 import org.onlab.util.NewConcurrentHashMap;
42 import org.slf4j.Logger; 47 import org.slf4j.Logger;
43 48
...@@ -104,6 +109,24 @@ public class GossipDeviceStore ...@@ -104,6 +109,24 @@ public class GossipDeviceStore
104 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 109 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105 protected ClusterCommunicationService clusterCommunicator; 110 protected ClusterCommunicationService clusterCommunicator;
106 111
112 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
113 + protected ClusterService clusterService;
114 +
115 + private static final KryoSerializer SERIALIZER = new KryoSerializer() {
116 + protected void setupKryoPool() {
117 + serializerPool = KryoPool.newBuilder()
118 + .register(KryoPoolUtil.API)
119 + .register(InternalDeviceEvent.class)
120 + .register(InternalPortEvent.class)
121 + .register(InternalPortStatusEvent.class)
122 + .register(Timestamped.class)
123 + .register(MastershipBasedTimestamp.class)
124 + .build()
125 + .populate(1);
126 + }
127 +
128 + };
129 +
107 @Activate 130 @Activate
108 public void activate() { 131 public void activate() {
109 clusterCommunicator.addSubscriber( 132 clusterCommunicator.addSubscriber(
...@@ -779,17 +802,26 @@ public class GossipDeviceStore ...@@ -779,17 +802,26 @@ public class GossipDeviceStore
779 } 802 }
780 803
781 private void notifyPeers(InternalDeviceEvent event) throws IOException { 804 private void notifyPeers(InternalDeviceEvent event) throws IOException {
782 - ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event); 805 + ClusterMessage message = new ClusterMessage(
806 + clusterService.getLocalNode().id(),
807 + GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
808 + SERIALIZER.encode(event));
783 clusterCommunicator.broadcast(message); 809 clusterCommunicator.broadcast(message);
784 } 810 }
785 811
786 private void notifyPeers(InternalPortEvent event) throws IOException { 812 private void notifyPeers(InternalPortEvent event) throws IOException {
787 - ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event); 813 + ClusterMessage message = new ClusterMessage(
814 + clusterService.getLocalNode().id(),
815 + GossipDeviceStoreMessageSubjects.PORT_UPDATE,
816 + SERIALIZER.encode(event));
788 clusterCommunicator.broadcast(message); 817 clusterCommunicator.broadcast(message);
789 } 818 }
790 819
791 private void notifyPeers(InternalPortStatusEvent event) throws IOException { 820 private void notifyPeers(InternalPortStatusEvent event) throws IOException {
792 - ClusterMessage message = new ClusterMessage(null, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event); 821 + ClusterMessage message = new ClusterMessage(
822 + clusterService.getLocalNode().id(),
823 + GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
824 + SERIALIZER.encode(event));
793 clusterCommunicator.broadcast(message); 825 clusterCommunicator.broadcast(message);
794 } 826 }
795 827
...@@ -797,7 +829,7 @@ public class GossipDeviceStore ...@@ -797,7 +829,7 @@ public class GossipDeviceStore
797 @Override 829 @Override
798 public void handle(ClusterMessage message) { 830 public void handle(ClusterMessage message) {
799 log.info("Received device update event from peer: {}", message.sender()); 831 log.info("Received device update event from peer: {}", message.sender());
800 - InternalDeviceEvent event = (InternalDeviceEvent) message.payload(); 832 + InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
801 ProviderId providerId = event.providerId(); 833 ProviderId providerId = event.providerId();
802 DeviceId deviceId = event.deviceId(); 834 DeviceId deviceId = event.deviceId();
803 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription(); 835 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
...@@ -810,7 +842,7 @@ public class GossipDeviceStore ...@@ -810,7 +842,7 @@ public class GossipDeviceStore
810 public void handle(ClusterMessage message) { 842 public void handle(ClusterMessage message) {
811 843
812 log.info("Received port update event from peer: {}", message.sender()); 844 log.info("Received port update event from peer: {}", message.sender());
813 - InternalPortEvent event = (InternalPortEvent) message.payload(); 845 + InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
814 846
815 ProviderId providerId = event.providerId(); 847 ProviderId providerId = event.providerId();
816 DeviceId deviceId = event.deviceId(); 848 DeviceId deviceId = event.deviceId();
...@@ -825,7 +857,7 @@ public class GossipDeviceStore ...@@ -825,7 +857,7 @@ public class GossipDeviceStore
825 public void handle(ClusterMessage message) { 857 public void handle(ClusterMessage message) {
826 858
827 log.info("Received port status update event from peer: {}", message.sender()); 859 log.info("Received port status update event from peer: {}", message.sender());
828 - InternalPortStatusEvent event = (InternalPortStatusEvent) message.payload(); 860 + InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
829 861
830 ProviderId providerId = event.providerId(); 862 ProviderId providerId = event.providerId();
831 DeviceId deviceId = event.deviceId(); 863 DeviceId deviceId = event.deviceId();
......
...@@ -17,27 +17,20 @@ public final class ClusterMessageSerializer extends Serializer<ClusterMessage> { ...@@ -17,27 +17,20 @@ public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
17 } 17 }
18 18
19 @Override 19 @Override
20 - public void write(Kryo kryo, Output output, ClusterMessage object) { 20 + public void write(Kryo kryo, Output output, ClusterMessage message) {
21 - kryo.writeClassAndObject(output, object.sender()); 21 + kryo.writeClassAndObject(output, message.sender());
22 - kryo.writeClassAndObject(output, object.subject()); 22 + kryo.writeClassAndObject(output, message.subject());
23 - // TODO: write bytes serialized using ClusterMessage specified serializer 23 + output.writeInt(message.payload().length);
24 - // write serialized payload size 24 + output.writeBytes(message.payload());
25 - //output.writeInt(...);
26 - // write serialized payload
27 - //output.writeBytes(...);
28 } 25 }
29 26
30 @Override 27 @Override
31 public ClusterMessage read(Kryo kryo, Input input, 28 public ClusterMessage read(Kryo kryo, Input input,
32 Class<ClusterMessage> type) { 29 Class<ClusterMessage> type) {
33 - // TODO Auto-generated method stub
34 NodeId sender = (NodeId) kryo.readClassAndObject(input); 30 NodeId sender = (NodeId) kryo.readClassAndObject(input);
35 MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input); 31 MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
36 - int size = input.readInt(); 32 + int payloadSize = input.readInt();
37 - byte[] payloadBytes = input.readBytes(size); 33 + byte[] payload = input.readBytes(payloadSize);
38 - // TODO: deserialize payload using ClusterMessage specified serializer
39 - Object payload = null;
40 return new ClusterMessage(sender, subject, payload); 34 return new ClusterMessage(sender, subject, payload);
41 } 35 }
42 -
43 } 36 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -20,6 +20,11 @@ import org.junit.Before; ...@@ -20,6 +20,11 @@ import org.junit.Before;
20 import org.junit.BeforeClass; 20 import org.junit.BeforeClass;
21 import org.junit.Ignore; 21 import org.junit.Ignore;
22 import org.junit.Test; 22 import org.junit.Test;
23 +import org.onlab.onos.cluster.ClusterEventListener;
24 +import org.onlab.onos.cluster.ClusterService;
25 +import org.onlab.onos.cluster.ControllerNode;
26 +import org.onlab.onos.cluster.ControllerNode.State;
27 +import org.onlab.onos.cluster.DefaultControllerNode;
23 import org.onlab.onos.cluster.MastershipTerm; 28 import org.onlab.onos.cluster.MastershipTerm;
24 import org.onlab.onos.cluster.NodeId; 29 import org.onlab.onos.cluster.NodeId;
25 import org.onlab.onos.net.Annotations; 30 import org.onlab.onos.net.Annotations;
...@@ -42,6 +47,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; ...@@ -42,6 +47,7 @@ import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
42 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 47 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
43 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler; 48 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
44 import org.onlab.onos.store.cluster.messaging.MessageSubject; 49 import org.onlab.onos.store.cluster.messaging.MessageSubject;
50 +import org.onlab.packet.IpPrefix;
45 51
46 import com.google.common.collect.Iterables; 52 import com.google.common.collect.Iterables;
47 import com.google.common.collect.Sets; 53 import com.google.common.collect.Sets;
...@@ -111,8 +117,9 @@ public class GossipDeviceStoreTest { ...@@ -111,8 +117,9 @@ public class GossipDeviceStoreTest {
111 deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2)); 117 deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
112 118
113 ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService(); 119 ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
120 + ClusterService clusterService = new TestClusterService();
114 121
115 - gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterCommunicator); 122 + gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
116 gossipDeviceStore.activate(); 123 gossipDeviceStore.activate();
117 deviceStore = gossipDeviceStore; 124 deviceStore = gossipDeviceStore;
118 } 125 }
...@@ -548,8 +555,12 @@ public class GossipDeviceStoreTest { ...@@ -548,8 +555,12 @@ public class GossipDeviceStoreTest {
548 555
549 private static final class TestGossipDeviceStore extends GossipDeviceStore { 556 private static final class TestGossipDeviceStore extends GossipDeviceStore {
550 557
551 - public TestGossipDeviceStore(ClockService clockService, ClusterCommunicationService clusterCommunicator) { 558 + public TestGossipDeviceStore(
559 + ClockService clockService,
560 + ClusterService clusterService,
561 + ClusterCommunicationService clusterCommunicator) {
552 this.clockService = clockService; 562 this.clockService = clockService;
563 + this.clusterService = clusterService;
553 this.clusterCommunicator = clusterCommunicator; 564 this.clusterCommunicator = clusterCommunicator;
554 } 565 }
555 } 566 }
...@@ -564,4 +575,45 @@ public class GossipDeviceStoreTest { ...@@ -564,4 +575,45 @@ public class GossipDeviceStoreTest {
564 @Override 575 @Override
565 public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {} 576 public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
566 } 577 }
578 +
579 + private static final class TestClusterService implements ClusterService {
580 +
581 + private static final ControllerNode ONOS1 =
582 + new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
583 + private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
584 + private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
585 +
586 + public TestClusterService() {
587 + nodes.put(new NodeId("N1"), ONOS1);
588 + nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
589 + }
590 +
591 + @Override
592 + public ControllerNode getLocalNode() {
593 + return ONOS1;
594 + }
595 +
596 + @Override
597 + public Set<ControllerNode> getNodes() {
598 + return Sets.newHashSet(nodes.values());
599 + }
600 +
601 + @Override
602 + public ControllerNode getNode(NodeId nodeId) {
603 + return nodes.get(nodeId);
604 + }
605 +
606 + @Override
607 + public State getState(NodeId nodeId) {
608 + return nodeStates.get(nodeId);
609 + }
610 +
611 + @Override
612 + public void addListener(ClusterEventListener listener) {
613 + }
614 +
615 + @Override
616 + public void removeListener(ClusterEventListener listener) {
617 + }
618 + }
567 } 619 }
......
...@@ -2,6 +2,7 @@ package org.onlab.onos.store.serializers; ...@@ -2,6 +2,7 @@ package org.onlab.onos.store.serializers;
2 2
3 import java.net.URI; 3 import java.net.URI;
4 import java.util.ArrayList; 4 import java.util.ArrayList;
5 +import java.util.Arrays;
5 import java.util.HashMap; 6 import java.util.HashMap;
6 7
7 import org.onlab.onos.cluster.ControllerNode; 8 import org.onlab.onos.cluster.ControllerNode;
...@@ -21,6 +22,8 @@ import org.onlab.onos.net.LinkKey; ...@@ -21,6 +22,8 @@ import org.onlab.onos.net.LinkKey;
21 import org.onlab.onos.net.MastershipRole; 22 import org.onlab.onos.net.MastershipRole;
22 import org.onlab.onos.net.Port; 23 import org.onlab.onos.net.Port;
23 import org.onlab.onos.net.PortNumber; 24 import org.onlab.onos.net.PortNumber;
25 +import org.onlab.onos.net.device.DefaultDeviceDescription;
26 +import org.onlab.onos.net.device.DefaultPortDescription;
24 import org.onlab.onos.net.provider.ProviderId; 27 import org.onlab.onos.net.provider.ProviderId;
25 import org.onlab.packet.IpAddress; 28 import org.onlab.packet.IpAddress;
26 import org.onlab.packet.IpPrefix; 29 import org.onlab.packet.IpPrefix;
...@@ -47,6 +50,7 @@ public final class KryoPoolUtil { ...@@ -47,6 +50,7 @@ public final class KryoPoolUtil {
47 .register( 50 .register(
48 // 51 //
49 ArrayList.class, 52 ArrayList.class,
53 + Arrays.asList().getClass(),
50 HashMap.class, 54 HashMap.class,
51 // 55 //
52 ControllerNode.State.class, 56 ControllerNode.State.class,
...@@ -54,8 +58,10 @@ public final class KryoPoolUtil { ...@@ -54,8 +58,10 @@ public final class KryoPoolUtil {
54 DefaultAnnotations.class, 58 DefaultAnnotations.class,
55 DefaultControllerNode.class, 59 DefaultControllerNode.class,
56 DefaultDevice.class, 60 DefaultDevice.class,
61 + DefaultDeviceDescription.class,
57 MastershipRole.class, 62 MastershipRole.class,
58 Port.class, 63 Port.class,
64 + DefaultPortDescription.class,
59 Element.class, 65 Element.class,
60 Link.Type.class 66 Link.Type.class
61 ) 67 )
......
...@@ -12,7 +12,7 @@ import java.nio.ByteBuffer; ...@@ -12,7 +12,7 @@ import java.nio.ByteBuffer;
12 public class KryoSerializer implements Serializer { 12 public class KryoSerializer implements Serializer {
13 13
14 private final Logger log = LoggerFactory.getLogger(getClass()); 14 private final Logger log = LoggerFactory.getLogger(getClass());
15 - private KryoPool serializerPool; 15 + protected KryoPool serializerPool;
16 16
17 17
18 public KryoSerializer() { 18 public KryoSerializer() {
......
...@@ -8,16 +8,15 @@ import java.util.concurrent.TimeoutException; ...@@ -8,16 +8,15 @@ import java.util.concurrent.TimeoutException;
8 * This class provides a base implementation of Response, with methods to retrieve the 8 * This class provides a base implementation of Response, with methods to retrieve the
9 * result and query to see if the result is ready. The result can only be retrieved when 9 * result and query to see if the result is ready. The result can only be retrieved when
10 * it is ready and the get methods will block if the result is not ready yet. 10 * it is ready and the get methods will block if the result is not ready yet.
11 - * @param <T> type of response.
12 */ 11 */
13 -public class AsyncResponse<T> implements Response<T> { 12 +public class AsyncResponse implements Response {
14 13
15 - private T value; 14 + private byte[] value;
16 private boolean done = false; 15 private boolean done = false;
17 private final long start = System.nanoTime(); 16 private final long start = System.nanoTime();
18 17
19 @Override 18 @Override
20 - public T get(long timeout, TimeUnit timeUnit) throws TimeoutException { 19 + public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
21 timeout = timeUnit.toNanos(timeout); 20 timeout = timeUnit.toNanos(timeout);
22 boolean interrupted = false; 21 boolean interrupted = false;
23 try { 22 try {
...@@ -43,7 +42,7 @@ public class AsyncResponse<T> implements Response<T> { ...@@ -43,7 +42,7 @@ public class AsyncResponse<T> implements Response<T> {
43 } 42 }
44 43
45 @Override 44 @Override
46 - public T get() throws InterruptedException { 45 + public byte[] get() throws InterruptedException {
47 throw new UnsupportedOperationException(); 46 throw new UnsupportedOperationException();
48 } 47 }
49 48
...@@ -57,11 +56,10 @@ public class AsyncResponse<T> implements Response<T> { ...@@ -57,11 +56,10 @@ public class AsyncResponse<T> implements Response<T> {
57 * available. 56 * available.
58 * @param data response data. 57 * @param data response data.
59 */ 58 */
60 - @SuppressWarnings("unchecked") 59 + public synchronized void setResponse(byte[] data) {
61 - public synchronized void setResponse(Object data) {
62 if (!done) { 60 if (!done) {
63 done = true; 61 done = true;
64 - value = (T) data; 62 + value = data;
65 this.notifyAll(); 63 this.notifyAll();
66 } 64 }
67 } 65 }
......
...@@ -13,11 +13,9 @@ public final class InternalMessage implements Message { ...@@ -13,11 +13,9 @@ public final class InternalMessage implements Message {
13 private long id; 13 private long id;
14 private Endpoint sender; 14 private Endpoint sender;
15 private String type; 15 private String type;
16 - private Object payload; 16 + private byte[] payload;
17 -
18 private transient NettyMessagingService messagingService; 17 private transient NettyMessagingService messagingService;
19 - // TODO: add transient payload serializer or change payload type to 18 + public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
20 - // byte[], ByteBuffer, etc.
21 19
22 // Must be created using the Builder. 20 // Must be created using the Builder.
23 private InternalMessage() {} 21 private InternalMessage() {}
...@@ -35,7 +33,7 @@ public final class InternalMessage implements Message { ...@@ -35,7 +33,7 @@ public final class InternalMessage implements Message {
35 } 33 }
36 34
37 @Override 35 @Override
38 - public Object payload() { 36 + public byte[] payload() {
39 return payload; 37 return payload;
40 } 38 }
41 39
...@@ -44,7 +42,7 @@ public final class InternalMessage implements Message { ...@@ -44,7 +42,7 @@ public final class InternalMessage implements Message {
44 } 42 }
45 43
46 @Override 44 @Override
47 - public void respond(Object data) throws IOException { 45 + public void respond(byte[] data) throws IOException {
48 Builder builder = new Builder(messagingService); 46 Builder builder = new Builder(messagingService);
49 InternalMessage message = builder.withId(this.id) 47 InternalMessage message = builder.withId(this.id)
50 // FIXME: Sender should be messagingService.localEp. 48 // FIXME: Sender should be messagingService.localEp.
...@@ -81,7 +79,7 @@ public final class InternalMessage implements Message { ...@@ -81,7 +79,7 @@ public final class InternalMessage implements Message {
81 message.sender = sender; 79 message.sender = sender;
82 return this; 80 return this;
83 } 81 }
84 - public Builder withPayload(Object payload) { 82 + public Builder withPayload(byte[] payload) {
85 message.payload = payload; 83 message.payload = payload;
86 return this; 84 return this;
87 } 85 }
......
...@@ -10,7 +10,7 @@ import java.util.HashMap; ...@@ -10,7 +10,7 @@ import java.util.HashMap;
10 /** 10 /**
11 * Kryo Serializer. 11 * Kryo Serializer.
12 */ 12 */
13 -public class KryoSerializer implements PayloadSerializer { 13 +public class KryoSerializer {
14 14
15 private KryoPool serializerPool; 15 private KryoPool serializerPool;
16 16
...@@ -28,29 +28,26 @@ public class KryoSerializer implements PayloadSerializer { ...@@ -28,29 +28,26 @@ public class KryoSerializer implements PayloadSerializer {
28 HashMap.class, 28 HashMap.class,
29 ArrayList.class, 29 ArrayList.class,
30 InternalMessage.class, 30 InternalMessage.class,
31 - Endpoint.class 31 + Endpoint.class,
32 + byte[].class
32 ) 33 )
33 .build() 34 .build()
34 .populate(1); 35 .populate(1);
35 } 36 }
36 37
37 38
38 - @Override
39 public <T> T decode(byte[] data) { 39 public <T> T decode(byte[] data) {
40 return serializerPool.deserialize(data); 40 return serializerPool.deserialize(data);
41 } 41 }
42 42
43 - @Override
44 public byte[] encode(Object payload) { 43 public byte[] encode(Object payload) {
45 return serializerPool.serialize(payload); 44 return serializerPool.serialize(payload);
46 } 45 }
47 46
48 - @Override
49 public <T> T decode(ByteBuffer buffer) { 47 public <T> T decode(ByteBuffer buffer) {
50 return serializerPool.deserialize(buffer); 48 return serializerPool.deserialize(buffer);
51 } 49 }
52 50
53 - @Override
54 public void encode(Object obj, ByteBuffer buffer) { 51 public void encode(Object obj, ByteBuffer buffer) {
55 serializerPool.serialize(obj, buffer); 52 serializerPool.serialize(obj, buffer);
56 } 53 }
......
...@@ -12,6 +12,6 @@ public class LoggingHandler implements MessageHandler { ...@@ -12,6 +12,6 @@ public class LoggingHandler implements MessageHandler {
12 12
13 @Override 13 @Override
14 public void handle(Message message) { 14 public void handle(Message message) {
15 - log.info("Received message. Payload: " + message.payload()); 15 + log.info("Received message. Payload has {} bytes", message.payload().length);
16 } 16 }
17 } 17 }
......
...@@ -12,12 +12,12 @@ public interface Message { ...@@ -12,12 +12,12 @@ public interface Message {
12 * Returns the payload of this message. 12 * Returns the payload of this message.
13 * @return message payload. 13 * @return message payload.
14 */ 14 */
15 - public Object payload(); 15 + public byte[] payload();
16 16
17 /** 17 /**
18 - * Sends a reply back to the sender of this messge. 18 + * Sends a reply back to the sender of this message.
19 * @param data payload of the response. 19 * @param data payload of the response.
20 * @throws IOException if there is a communication error. 20 * @throws IOException if there is a communication error.
21 */ 21 */
22 - public void respond(Object data) throws IOException; 22 + public void respond(byte[] data) throws IOException;
23 } 23 }
......
...@@ -14,14 +14,14 @@ import java.util.List; ...@@ -14,14 +14,14 @@ import java.util.List;
14 public class MessageDecoder extends ReplayingDecoder<DecoderState> { 14 public class MessageDecoder extends ReplayingDecoder<DecoderState> {
15 15
16 private final NettyMessagingService messagingService; 16 private final NettyMessagingService messagingService;
17 - private final PayloadSerializer payloadSerializer; 17 +
18 + private static final KryoSerializer SERIALIZER = new KryoSerializer();
18 19
19 private int contentLength; 20 private int contentLength;
20 21
21 - public MessageDecoder(NettyMessagingService messagingService, PayloadSerializer payloadSerializer) { 22 + public MessageDecoder(NettyMessagingService messagingService) {
22 super(DecoderState.READ_HEADER_VERSION); 23 super(DecoderState.READ_HEADER_VERSION);
23 this.messagingService = messagingService; 24 this.messagingService = messagingService;
24 - this.payloadSerializer = payloadSerializer;
25 } 25 }
26 26
27 @Override 27 @Override
...@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> { ...@@ -48,7 +48,7 @@ public class MessageDecoder extends ReplayingDecoder<DecoderState> {
48 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version"); 48 checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
49 checkpoint(DecoderState.READ_CONTENT); 49 checkpoint(DecoderState.READ_CONTENT);
50 case READ_CONTENT: 50 case READ_CONTENT:
51 - InternalMessage message = payloadSerializer.decode(buffer.readBytes(contentLength).nioBuffer()); 51 + InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
52 message.setMessagingService(messagingService); 52 message.setMessagingService(messagingService);
53 out.add(message); 53 out.add(message);
54 checkpoint(DecoderState.READ_HEADER_VERSION); 54 checkpoint(DecoderState.READ_HEADER_VERSION);
......
...@@ -17,11 +17,7 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { ...@@ -17,11 +17,7 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
17 public static final int SERIALIZER_VERSION = 1; 17 public static final int SERIALIZER_VERSION = 1;
18 18
19 19
20 - private final PayloadSerializer payloadSerializer; 20 + private static final KryoSerializer SERIALIZER = new KryoSerializer();
21 -
22 - public MessageEncoder(PayloadSerializer payloadSerializer) {
23 - this.payloadSerializer = payloadSerializer;
24 - }
25 21
26 @Override 22 @Override
27 protected void encode( 23 protected void encode(
...@@ -35,7 +31,12 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> { ...@@ -35,7 +31,12 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
35 // write preamble 31 // write preamble
36 out.writeBytes(PREAMBLE); 32 out.writeBytes(PREAMBLE);
37 33
38 - byte[] payload = payloadSerializer.encode(message); 34 + try {
35 + SERIALIZER.encode(message);
36 + } catch (Exception e) {
37 + e.printStackTrace();
38 + }
39 + byte[] payload = SERIALIZER.encode(message);
39 40
40 // write payload length 41 // write payload length
41 out.writeInt(payload.length); 42 out.writeInt(payload.length);
......
...@@ -11,10 +11,10 @@ public interface MessagingService { ...@@ -11,10 +11,10 @@ public interface MessagingService {
11 * The message is specified using the type and payload. 11 * The message is specified using the type and payload.
12 * @param ep end point to send the message to. 12 * @param ep end point to send the message to.
13 * @param type type of message. 13 * @param type type of message.
14 - * @param payload message payload. 14 + * @param payload message payload bytes.
15 * @throws IOException 15 * @throws IOException
16 */ 16 */
17 - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException; 17 + public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
18 18
19 /** 19 /**
20 * Sends a message synchronously and waits for a response. 20 * Sends a message synchronously and waits for a response.
...@@ -24,7 +24,7 @@ public interface MessagingService { ...@@ -24,7 +24,7 @@ public interface MessagingService {
24 * @return a response future 24 * @return a response future
25 * @throws IOException 25 * @throws IOException
26 */ 26 */
27 - public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException; 27 + public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
28 28
29 /** 29 /**
30 * Registers a new message handler for message type. 30 * Registers a new message handler for message type.
...@@ -38,12 +38,4 @@ public interface MessagingService { ...@@ -38,12 +38,4 @@ public interface MessagingService {
38 * @param type message type 38 * @param type message type
39 */ 39 */
40 public void unregisterHandler(String type); 40 public void unregisterHandler(String type);
41 -
42 - // FIXME: remove me and add PayloadSerializer to all other methods
43 - /**
44 - * Specify the serializer to use for encoding/decoding payload.
45 - *
46 - * @param payloadSerializer payloadSerializer to use
47 - */
48 - public void setPayloadSerializer(PayloadSerializer payloadSerializer);
49 } 41 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -43,7 +43,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -43,7 +43,7 @@ public class NettyMessagingService implements MessagingService {
43 private final EventLoopGroup bossGroup = new NioEventLoopGroup(); 43 private final EventLoopGroup bossGroup = new NioEventLoopGroup();
44 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); 44 private final EventLoopGroup workerGroup = new NioEventLoopGroup();
45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>(); 45 private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
46 - private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder() 46 + private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
47 .maximumSize(100000) 47 .maximumSize(100000)
48 .weakValues() 48 .weakValues()
49 // TODO: Once the entry expires, notify blocking threads (if any). 49 // TODO: Once the entry expires, notify blocking threads (if any).
...@@ -52,8 +52,6 @@ public class NettyMessagingService implements MessagingService { ...@@ -52,8 +52,6 @@ public class NettyMessagingService implements MessagingService {
52 private final GenericKeyedObjectPool<Endpoint, Channel> channels 52 private final GenericKeyedObjectPool<Endpoint, Channel> channels
53 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory()); 53 = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
54 54
55 - protected PayloadSerializer payloadSerializer;
56 -
57 public NettyMessagingService() { 55 public NettyMessagingService() {
58 // TODO: Default port should be configurable. 56 // TODO: Default port should be configurable.
59 this(8080); 57 this(8080);
...@@ -83,7 +81,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -83,7 +81,7 @@ public class NettyMessagingService implements MessagingService {
83 } 81 }
84 82
85 @Override 83 @Override
86 - public void sendAsync(Endpoint ep, String type, Object payload) throws IOException { 84 + public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
87 InternalMessage message = new InternalMessage.Builder(this) 85 InternalMessage message = new InternalMessage.Builder(this)
88 .withId(RandomUtils.nextLong()) 86 .withId(RandomUtils.nextLong())
89 .withSender(localEp) 87 .withSender(localEp)
...@@ -108,9 +106,9 @@ public class NettyMessagingService implements MessagingService { ...@@ -108,9 +106,9 @@ public class NettyMessagingService implements MessagingService {
108 } 106 }
109 107
110 @Override 108 @Override
111 - public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) 109 + public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
112 throws IOException { 110 throws IOException {
113 - AsyncResponse<T> futureResponse = new AsyncResponse<T>(); 111 + AsyncResponse futureResponse = new AsyncResponse();
114 Long messageId = RandomUtils.nextLong(); 112 Long messageId = RandomUtils.nextLong();
115 responseFutures.put(messageId, futureResponse); 113 responseFutures.put(messageId, futureResponse);
116 InternalMessage message = new InternalMessage.Builder(this) 114 InternalMessage message = new InternalMessage.Builder(this)
...@@ -133,11 +131,6 @@ public class NettyMessagingService implements MessagingService { ...@@ -133,11 +131,6 @@ public class NettyMessagingService implements MessagingService {
133 handlers.remove(type); 131 handlers.remove(type);
134 } 132 }
135 133
136 - @Override
137 - public void setPayloadSerializer(PayloadSerializer payloadSerializer) {
138 - this.payloadSerializer = payloadSerializer;
139 - }
140 -
141 private MessageHandler getMessageHandler(String type) { 134 private MessageHandler getMessageHandler(String type) {
142 return handlers.get(type); 135 return handlers.get(type);
143 } 136 }
...@@ -202,13 +195,13 @@ public class NettyMessagingService implements MessagingService { ...@@ -202,13 +195,13 @@ public class NettyMessagingService implements MessagingService {
202 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> { 195 private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
203 196
204 private final ChannelHandler dispatcher = new InboundMessageDispatcher(); 197 private final ChannelHandler dispatcher = new InboundMessageDispatcher();
205 - private final ChannelHandler encoder = new MessageEncoder(payloadSerializer); 198 + private final ChannelHandler encoder = new MessageEncoder();
206 199
207 @Override 200 @Override
208 protected void initChannel(SocketChannel channel) throws Exception { 201 protected void initChannel(SocketChannel channel) throws Exception {
209 channel.pipeline() 202 channel.pipeline()
210 .addLast("encoder", encoder) 203 .addLast("encoder", encoder)
211 - .addLast("decoder", new MessageDecoder(NettyMessagingService.this, payloadSerializer)) 204 + .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
212 .addLast("handler", dispatcher); 205 .addLast("handler", dispatcher);
213 } 206 }
214 } 207 }
...@@ -237,7 +230,7 @@ public class NettyMessagingService implements MessagingService { ...@@ -237,7 +230,7 @@ public class NettyMessagingService implements MessagingService {
237 String type = message.type(); 230 String type = message.type();
238 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) { 231 if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
239 try { 232 try {
240 - AsyncResponse<?> futureResponse = 233 + AsyncResponse futureResponse =
241 NettyMessagingService.this.responseFutures.getIfPresent(message.id()); 234 NettyMessagingService.this.responseFutures.getIfPresent(message.id());
242 if (futureResponse != null) { 235 if (futureResponse != null) {
243 futureResponse.setResponse(message.payload()); 236 futureResponse.setResponse(message.payload());
......
1 -package org.onlab.netty;
2 -
3 -import java.nio.ByteBuffer;
4 -
5 -/**
6 - * Interface for encoding/decoding message payloads.
7 - */
8 -public interface PayloadSerializer {
9 -
10 - /**
11 - * Decodes the specified byte array to a POJO.
12 - *
13 - * @param data byte array.
14 - * @return POJO
15 - */
16 - public <T> T decode(byte[] data);
17 -
18 - /**
19 - * Encodes the specified POJO into a byte array.
20 - *
21 - * @param data POJO to be encoded
22 - * @return byte array.
23 - */
24 - public byte[] encode(Object data);
25 -
26 - /**
27 - * Encodes the specified POJO into a byte buffer.
28 - *
29 - * @param data POJO to be encoded
30 - * @param buffer to write serialized bytes
31 - */
32 - public void encode(final Object data, ByteBuffer buffer);
33 -
34 - /**
35 - * Decodes the specified byte buffer to a POJO.
36 - *
37 - * @param buffer bytes to be decoded
38 - * @return POJO
39 - */
40 - public <T> T decode(final ByteBuffer buffer);
41 -}
...@@ -7,26 +7,24 @@ import java.util.concurrent.TimeoutException; ...@@ -7,26 +7,24 @@ import java.util.concurrent.TimeoutException;
7 * Response object returned when making synchronous requests. 7 * Response object returned when making synchronous requests.
8 * Can you used to check is a response is ready and/or wait for a response 8 * Can you used to check is a response is ready and/or wait for a response
9 * to become available. 9 * to become available.
10 - *
11 - * @param <T> type of response.
12 */ 10 */
13 -public interface Response<T> { 11 +public interface Response {
14 12
15 /** 13 /**
16 * Gets the response waiting for a designated timeout period. 14 * Gets the response waiting for a designated timeout period.
17 * @param timeout timeout period (since request was sent out) 15 * @param timeout timeout period (since request was sent out)
18 * @param tu unit of time. 16 * @param tu unit of time.
19 - * @return response 17 + * @return response payload
20 * @throws TimeoutException if the timeout expires before the response arrives. 18 * @throws TimeoutException if the timeout expires before the response arrives.
21 */ 19 */
22 - public T get(long timeout, TimeUnit tu) throws TimeoutException; 20 + public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
23 21
24 /** 22 /**
25 * Gets the response waiting for indefinite timeout period. 23 * Gets the response waiting for indefinite timeout period.
26 - * @return response 24 + * @return response payload
27 * @throws InterruptedException if the thread is interrupted before the response arrives. 25 * @throws InterruptedException if the thread is interrupted before the response arrives.
28 */ 26 */
29 - public T get() throws InterruptedException; 27 + public byte[] get() throws InterruptedException;
30 28
31 /** 29 /**
32 * Checks if the response is ready without blocking. 30 * Checks if the response is ready without blocking.
......
...@@ -24,7 +24,7 @@ public final class SimpleClient { ...@@ -24,7 +24,7 @@ public final class SimpleClient {
24 final int warmup = 100; 24 final int warmup = 100;
25 for (int i = 0; i < warmup; i++) { 25 for (int i = 0; i < warmup; i++) {
26 Timer.Context context = sendAsyncTimer.time(); 26 Timer.Context context = sendAsyncTimer.time();
27 - messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World"); 27 + messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
28 context.stop(); 28 context.stop();
29 } 29 }
30 metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer); 30 metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
...@@ -33,10 +33,10 @@ public final class SimpleClient { ...@@ -33,10 +33,10 @@ public final class SimpleClient {
33 final int iterations = 1000000; 33 final int iterations = 1000000;
34 for (int i = 0; i < iterations; i++) { 34 for (int i = 0; i < iterations; i++) {
35 Timer.Context context = sendAndReceiveTimer.time(); 35 Timer.Context context = sendAndReceiveTimer.time();
36 - Response<String> response = messaging 36 + Response response = messaging
37 .sendAndReceive(new Endpoint("localhost", 8080), "echo", 37 .sendAndReceive(new Endpoint("localhost", 8080), "echo",
38 - "Hello World"); 38 + "Hello World".getBytes());
39 - System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS)); 39 + System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
40 context.stop(); 40 context.stop();
41 } 41 }
42 metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer); 42 metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
...@@ -45,8 +45,6 @@ public final class SimpleClient { ...@@ -45,8 +45,6 @@ public final class SimpleClient {
45 public static class TestNettyMessagingService extends NettyMessagingService { 45 public static class TestNettyMessagingService extends NettyMessagingService {
46 public TestNettyMessagingService(int port) throws Exception { 46 public TestNettyMessagingService(int port) throws Exception {
47 super(port); 47 super(port);
48 - PayloadSerializer payloadSerializer = new KryoSerializer();
49 - this.payloadSerializer = payloadSerializer;
50 } 48 }
51 } 49 }
52 } 50 }
......
...@@ -7,7 +7,6 @@ public final class SimpleServer { ...@@ -7,7 +7,6 @@ public final class SimpleServer {
7 public static void main(String... args) throws Exception { 7 public static void main(String... args) throws Exception {
8 NettyMessagingService server = new NettyMessagingService(8080); 8 NettyMessagingService server = new NettyMessagingService(8080);
9 server.activate(); 9 server.activate();
10 - server.setPayloadSerializer(new KryoSerializer());
11 server.registerHandler("simple", new LoggingHandler()); 10 server.registerHandler("simple", new LoggingHandler());
12 server.registerHandler("echo", new EchoHandler()); 11 server.registerHandler("echo", new EchoHandler());
13 } 12 }
......
...@@ -2,7 +2,8 @@ package org.onlab.netty; ...@@ -2,7 +2,8 @@ package org.onlab.netty;
2 2
3 import java.util.concurrent.TimeUnit; 3 import java.util.concurrent.TimeUnit;
4 4
5 -import org.junit.Assert; 5 +import org.apache.commons.lang3.RandomUtils;
6 +import static org.junit.Assert.*;
6 import org.junit.Test; 7 import org.junit.Test;
7 8
8 /** 9 /**
...@@ -17,11 +18,10 @@ public class PingPongTest { ...@@ -17,11 +18,10 @@ public class PingPongTest {
17 try { 18 try {
18 pinger.activate(); 19 pinger.activate();
19 ponger.activate(); 20 ponger.activate();
20 - pinger.setPayloadSerializer(new KryoSerializer());
21 - ponger.setPayloadSerializer(new KryoSerializer());
22 ponger.registerHandler("echo", new EchoHandler()); 21 ponger.registerHandler("echo", new EchoHandler());
23 - Response<String> response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", "hello"); 22 + byte[] payload = RandomUtils.nextBytes(100);
24 - Assert.assertEquals("hello", response.get(10000, TimeUnit.MILLISECONDS)); 23 + Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
24 + assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
25 } finally { 25 } finally {
26 pinger.deactivate(); 26 pinger.deactivate();
27 ponger.deactivate(); 27 ponger.deactivate();
......