tom

Initial check-in for new p2p cluster messaging; to be refactored.

...@@ -28,10 +28,6 @@ ...@@ -28,10 +28,6 @@
28 <version>${project.version}</version> 28 <version>${project.version}</version>
29 </dependency> 29 </dependency>
30 <dependency> 30 <dependency>
31 - <groupId>org.livetribe.slp</groupId>
32 - <artifactId>livetribe-slp</artifactId>
33 - </dependency>
34 - <dependency>
35 <groupId>org.apache.karaf.shell</groupId> 31 <groupId>org.apache.karaf.shell</groupId>
36 <artifactId>org.apache.karaf.shell.console</artifactId> 32 <artifactId>org.apache.karaf.shell.console</artifactId>
37 </dependency> 33 </dependency>
......
...@@ -26,6 +26,23 @@ ...@@ -26,6 +26,23 @@
26 <artifactId>onos-core-serializers</artifactId> 26 <artifactId>onos-core-serializers</artifactId>
27 <version>${project.version}</version> 27 <version>${project.version}</version>
28 </dependency> 28 </dependency>
29 +
30 +
31 + <dependency>
32 + <groupId>org.onlab.onos</groupId>
33 + <artifactId>onlab-nio</artifactId>
34 + <version>${project.version}</version>
35 + </dependency>
36 +
37 + <dependency>
38 + <groupId>com.fasterxml.jackson.core</groupId>
39 + <artifactId>jackson-databind</artifactId>
40 + </dependency>
41 + <dependency>
42 + <groupId>com.fasterxml.jackson.core</groupId>
43 + <artifactId>jackson-annotations</artifactId>
44 + </dependency>
45 +
29 <dependency> 46 <dependency>
30 <groupId>org.apache.felix</groupId> 47 <groupId>org.apache.felix</groupId>
31 <artifactId>org.apache.felix.scr.annotations</artifactId> 48 <artifactId>org.apache.felix.scr.annotations</artifactId>
......
1 +package org.onlab.onos.store.cluster.impl;
2 +
3 +import com.fasterxml.jackson.core.JsonEncoding;
4 +import com.fasterxml.jackson.core.JsonFactory;
5 +import com.fasterxml.jackson.databind.JsonNode;
6 +import com.fasterxml.jackson.databind.ObjectMapper;
7 +import com.fasterxml.jackson.databind.node.ArrayNode;
8 +import com.fasterxml.jackson.databind.node.ObjectNode;
9 +import org.onlab.onos.cluster.DefaultControllerNode;
10 +import org.onlab.onos.cluster.NodeId;
11 +import org.onlab.packet.IpPrefix;
12 +
13 +import java.io.File;
14 +import java.io.IOException;
15 +import java.util.HashSet;
16 +import java.util.Iterator;
17 +import java.util.Set;
18 +
19 +/**
20 + * Allows for reading and writing cluster definition as a JSON file.
21 + */
22 +public class ClusterDefinitionStore {
23 +
24 + private final File file;
25 +
26 + /**
27 + * Creates a reader/writer of the cluster definition file.
28 + *
29 + * @param filePath location of the definition file
30 + */
31 + public ClusterDefinitionStore(String filePath) {
32 + file = new File(filePath);
33 + }
34 +
35 + /**
36 + * Returns set of the controller nodes, including self.
37 + *
38 + * @return set of controller nodes
39 + */
40 + public Set<DefaultControllerNode> read() throws IOException {
41 + Set<DefaultControllerNode> nodes = new HashSet<>();
42 + ObjectMapper mapper = new ObjectMapper();
43 + ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
44 + Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
45 + while (it.hasNext()) {
46 + ObjectNode nodeDef = (ObjectNode) it.next();
47 + nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
48 + IpPrefix.valueOf(nodeDef.get("ip").asText()),
49 + nodeDef.get("tcpPort").asInt(9876)));
50 + }
51 + return nodes;
52 + }
53 +
54 + /**
55 + * Writes the given set of the controller nodes.
56 + *
57 + * @param nodes set of controller nodes
58 + */
59 + public void write(Set<DefaultControllerNode> nodes) throws IOException {
60 + ObjectMapper mapper = new ObjectMapper();
61 + ObjectNode clusterNodeDef = mapper.createObjectNode();
62 + ArrayNode nodeDefs = mapper.createArrayNode();
63 + clusterNodeDef.set("nodes", nodeDefs);
64 + for (DefaultControllerNode node : nodes) {
65 + ObjectNode nodeDef = mapper.createObjectNode();
66 + nodeDef.put("id", node.id().toString())
67 + .put("ip", node.ip().toString())
68 + .put("tcpPort", node.tcpPort());
69 + nodeDefs.add(nodeDef);
70 + }
71 + mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
72 + clusterNodeDef);
73 + }
74 +
75 +}
1 -package org.onlab.onos.ccc; 1 +package org.onlab.onos.store.cluster.impl;
2 2
3 import com.google.common.collect.ImmutableSet; 3 import com.google.common.collect.ImmutableSet;
4 import org.apache.felix.scr.annotations.Activate; 4 import org.apache.felix.scr.annotations.Activate;
...@@ -21,12 +21,18 @@ import org.slf4j.LoggerFactory; ...@@ -21,12 +21,18 @@ import org.slf4j.LoggerFactory;
21 21
22 import java.io.IOException; 22 import java.io.IOException;
23 import java.net.InetSocketAddress; 23 import java.net.InetSocketAddress;
24 +import java.net.Socket;
25 +import java.net.SocketAddress;
24 import java.nio.channels.ByteChannel; 26 import java.nio.channels.ByteChannel;
27 +import java.nio.channels.SelectionKey;
25 import java.nio.channels.ServerSocketChannel; 28 import java.nio.channels.ServerSocketChannel;
29 +import java.nio.channels.SocketChannel;
26 import java.util.ArrayList; 30 import java.util.ArrayList;
27 import java.util.List; 31 import java.util.List;
28 import java.util.Map; 32 import java.util.Map;
29 import java.util.Set; 33 import java.util.Set;
34 +import java.util.Timer;
35 +import java.util.TimerTask;
30 import java.util.concurrent.ConcurrentHashMap; 36 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService; 37 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors; 38 import java.util.concurrent.Executors;
...@@ -45,35 +51,88 @@ public class DistributedClusterStore ...@@ -45,35 +51,88 @@ public class DistributedClusterStore
45 extends AbstractStore<ClusterEvent, ClusterStoreDelegate> 51 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
46 implements ClusterStore { 52 implements ClusterStore {
47 53
54 + private static final int HELLO_MSG = 1;
55 + private static final int ECHO_MSG = 2;
56 +
48 private final Logger log = LoggerFactory.getLogger(getClass()); 57 private final Logger log = LoggerFactory.getLogger(getClass());
49 58
59 + private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
60 + private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
61 +
62 + private static final long START_TIMEOUT = 1000;
50 private static final long SELECT_TIMEOUT = 50; 63 private static final long SELECT_TIMEOUT = 50;
51 private static final int WORKERS = 3; 64 private static final int WORKERS = 3;
52 - private static final int COMM_BUFFER_SIZE = 16 * 1024; 65 + private static final int COMM_BUFFER_SIZE = 32 * 1024;
53 private static final int COMM_IDLE_TIME = 500; 66 private static final int COMM_IDLE_TIME = 500;
54 67
68 + private static final boolean SO_NO_DELAY = false;
69 + private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
70 + private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
71 +
55 private DefaultControllerNode self; 72 private DefaultControllerNode self;
56 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); 73 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
57 private final Map<NodeId, State> states = new ConcurrentHashMap<>(); 74 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
58 75
76 + // Means to track message streams to other nodes.
77 + private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
78 + private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
79 +
80 + // Executor pools for listening and managing connections to other nodes.
59 private final ExecutorService listenExecutor = 81 private final ExecutorService listenExecutor =
60 - Executors.newSingleThreadExecutor(namedThreads("onos-listen")); 82 + Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
61 private final ExecutorService commExecutors = 83 private final ExecutorService commExecutors =
62 - Executors.newFixedThreadPool(WORKERS, namedThreads("onos-cluster")); 84 + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
63 private final ExecutorService heartbeatExecutor = 85 private final ExecutorService heartbeatExecutor =
64 - Executors.newSingleThreadExecutor(namedThreads("onos-heartbeat")); 86 + Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
87 +
88 + private final Timer timer = new Timer("onos-comm-initiator");
89 + private final TimerTask connectionCustodian = new ConnectionCustodian();
65 90
66 private ListenLoop listenLoop; 91 private ListenLoop listenLoop;
67 private List<CommLoop> commLoops = new ArrayList<>(WORKERS); 92 private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
68 93
69 @Activate 94 @Activate
70 public void activate() { 95 public void activate() {
71 - establishIdentity(); 96 + loadClusterDefinition();
72 startCommunications(); 97 startCommunications();
73 startListening(); 98 startListening();
99 + startInitiating();
74 log.info("Started"); 100 log.info("Started");
75 } 101 }
76 102
103 + @Deactivate
104 + public void deactivate() {
105 + listenLoop.shutdown();
106 + for (CommLoop loop : commLoops) {
107 + loop.shutdown();
108 + }
109 + log.info("Stopped");
110 + }
111 +
112 + // Loads the cluster definition file
113 + private void loadClusterDefinition() {
114 +// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
115 +// try {
116 +// Set<DefaultControllerNode> storedNodes = cds.read();
117 +// for (DefaultControllerNode node : storedNodes) {
118 +// nodes.put(node.id(), node);
119 +// }
120 +// } catch (IOException e) {
121 +// log.error("Unable to read cluster definitions", e);
122 +// }
123 +
124 + // Establishes the controller's own identity.
125 + IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
126 + self = nodes.get(new NodeId(ip.toString()));
127 +
128 + // As a fall-back, let's make sure we at least know who we are.
129 + if (self == null) {
130 + self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
131 + nodes.put(self.id(), self);
132 + }
133 + }
134 +
135 + // Kicks off the IO loops.
77 private void startCommunications() { 136 private void startCommunications() {
78 for (int i = 0; i < WORKERS; i++) { 137 for (int i = 0; i < WORKERS; i++) {
79 try { 138 try {
...@@ -84,6 +143,13 @@ public class DistributedClusterStore ...@@ -84,6 +143,13 @@ public class DistributedClusterStore
84 log.warn("Unable to start comm IO loop", e); 143 log.warn("Unable to start comm IO loop", e);
85 } 144 }
86 } 145 }
146 +
147 + // Wait for the IO loops to start
148 + for (CommLoop loop : commLoops) {
149 + if (!loop.awaitStart(START_TIMEOUT)) {
150 + log.warn("Comm loop did not start on-time; moving on...");
151 + }
152 + }
87 } 153 }
88 154
89 // Starts listening for connections from peer cluster members. 155 // Starts listening for connections from peer cluster members.
...@@ -91,25 +157,34 @@ public class DistributedClusterStore ...@@ -91,25 +157,34 @@ public class DistributedClusterStore
91 try { 157 try {
92 listenLoop = new ListenLoop(self.ip(), self.tcpPort()); 158 listenLoop = new ListenLoop(self.ip(), self.tcpPort());
93 listenExecutor.execute(listenLoop); 159 listenExecutor.execute(listenLoop);
160 + if (!listenLoop.awaitStart(START_TIMEOUT)) {
161 + log.warn("Listen loop did not start on-time; moving on...");
162 + }
94 } catch (IOException e) { 163 } catch (IOException e) {
95 log.error("Unable to listen for cluster connections", e); 164 log.error("Unable to listen for cluster connections", e);
96 } 165 }
97 } 166 }
98 167
99 - // Establishes the controller's own identity. 168 + /**
100 - private void establishIdentity() { 169 + * Initiates open connection request and registers the pending socket
101 - // For now rely on env. variable. 170 + * channel with the given IO loop.
102 - IpPrefix ip = valueOf(System.getenv("ONOS_NIC")); 171 + *
103 - self = new DefaultControllerNode(new NodeId(ip.toString()), ip); 172 + * @param loop loop with which the channel should be registered
173 + * @throws java.io.IOException if the socket could not be open or connected
174 + */
175 + private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
176 + SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
177 + SocketChannel ch = SocketChannel.open();
178 + nodesByChannel.put(ch, node);
179 + ch.configureBlocking(false);
180 + ch.connect(sa);
181 + loop.connectStream(ch);
104 } 182 }
105 183
106 - @Deactivate 184 +
107 - public void deactivate() { 185 + // Attempts to connect to any nodes that do not have an associated connection.
108 - listenLoop.shutdown(); 186 + private void startInitiating() {
109 - for (CommLoop loop : commLoops) { 187 + timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
110 - loop.shutdown();
111 - }
112 - log.info("Stopped");
113 } 188 }
114 189
115 @Override 190 @Override
...@@ -144,6 +219,7 @@ public class DistributedClusterStore ...@@ -144,6 +219,7 @@ public class DistributedClusterStore
144 @Override 219 @Override
145 public void removeNode(NodeId nodeId) { 220 public void removeNode(NodeId nodeId) {
146 nodes.remove(nodeId); 221 nodes.remove(nodeId);
222 + streams.remove(nodeId);
147 } 223 }
148 224
149 // Listens and accepts inbound connections from other cluster nodes. 225 // Listens and accepts inbound connections from other cluster nodes.
...@@ -154,7 +230,15 @@ public class DistributedClusterStore ...@@ -154,7 +230,15 @@ public class DistributedClusterStore
154 230
155 @Override 231 @Override
156 protected void acceptConnection(ServerSocketChannel channel) throws IOException { 232 protected void acceptConnection(ServerSocketChannel channel) throws IOException {
233 + SocketChannel sc = channel.accept();
234 + sc.configureBlocking(false);
235 +
236 + Socket so = sc.socket();
237 + so.setTcpNoDelay(SO_NO_DELAY);
238 + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
239 + so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
157 240
241 + findLeastUtilizedLoop().acceptStream(sc);
158 } 242 }
159 } 243 }
160 244
...@@ -170,7 +254,109 @@ public class DistributedClusterStore ...@@ -170,7 +254,109 @@ public class DistributedClusterStore
170 254
171 @Override 255 @Override
172 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) { 256 protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
257 + TLVMessageStream tlvStream = (TLVMessageStream) stream;
258 + for (TLVMessage message : messages) {
259 + // TODO: add type-based dispatching here...
260 + log.info("Got message {}", message.type());
173 261
262 + // FIXME: hack to get going
263 + if (message.type() == HELLO_MSG) {
264 + processHello(message, tlvStream);
265 + }
266 + }
267 + }
268 +
269 + @Override
270 + public TLVMessageStream acceptStream(SocketChannel channel) {
271 + TLVMessageStream stream = super.acceptStream(channel);
272 + try {
273 + InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
274 + log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
275 + stream.write(createHello(self));
276 +
277 + } catch (IOException e) {
278 + log.warn("Unable to accept connection from an unknown end-point", e);
279 + }
280 + return stream;
281 + }
282 +
283 + @Override
284 + public TLVMessageStream connectStream(SocketChannel channel) {
285 + TLVMessageStream stream = super.connectStream(channel);
286 + DefaultControllerNode node = nodesByChannel.get(channel);
287 + if (node != null) {
288 + log.info("Opened connection to node {}", node.id());
289 + nodesByChannel.remove(channel);
290 + }
291 + return stream;
292 + }
293 +
294 + @Override
295 + protected void connect(SelectionKey key) {
296 + super.connect(key);
297 + TLVMessageStream stream = (TLVMessageStream) key.attachment();
298 + send(stream, createHello(self));
299 + }
300 + }
301 +
302 + // FIXME: pure hack for now
303 + private void processHello(TLVMessage message, TLVMessageStream stream) {
304 + String data = new String(message.data());
305 + log.info("Processing hello with data [{}]", data);
306 + String[] fields = new String(data).split(":");
307 + DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
308 + IpPrefix.valueOf(fields[1]),
309 + Integer.parseInt(fields[2]));
310 + stream.setNode(node);
311 + nodes.put(node.id(), node);
312 + streams.put(node.id(), stream);
313 + }
314 +
315 + // Sends message to the specified stream.
316 + private void send(TLVMessageStream stream, TLVMessage message) {
317 + try {
318 + stream.write(message);
319 + } catch (IOException e) {
320 + log.warn("Unable to send message to {}", stream.node().id());
321 + }
322 + }
323 +
324 + private TLVMessage createHello(DefaultControllerNode self) {
325 + return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
326 + }
327 +
328 + // Sweeps through all controller nodes and attempts to open connection to
329 + // those that presently do not have one.
330 + private class ConnectionCustodian extends TimerTask {
331 + @Override
332 + public void run() {
333 + for (DefaultControllerNode node : nodes.values()) {
334 + if (node != self && !streams.containsKey(node.id())) {
335 + try {
336 + openConnection(node, findLeastUtilizedLoop());
337 + } catch (IOException e) {
338 + log.warn("Unable to connect", e);
339 + }
340 + }
341 + }
342 + }
343 + }
344 +
345 + // Finds the least utilities IO loop.
346 + private CommLoop findLeastUtilizedLoop() {
347 + CommLoop leastUtilized = null;
348 + int minCount = Integer.MAX_VALUE;
349 + for (CommLoop loop : commLoops) {
350 + int count = loop.streamCount();
351 + if (count == 0) {
352 + return loop;
353 + }
354 +
355 + if (count < minCount) {
356 + leastUtilized = loop;
357 + minCount = count;
358 + }
174 } 359 }
360 + return leastUtilized;
175 } 361 }
176 } 362 }
......
1 -package org.onlab.onos.ccc; 1 +package org.onlab.onos.store.cluster.impl;
2 2
3 import org.onlab.nio.AbstractMessage; 3 import org.onlab.nio.AbstractMessage;
4 4
...@@ -12,17 +12,16 @@ import static com.google.common.base.MoreObjects.toStringHelper; ...@@ -12,17 +12,16 @@ import static com.google.common.base.MoreObjects.toStringHelper;
12 public class TLVMessage extends AbstractMessage { 12 public class TLVMessage extends AbstractMessage {
13 13
14 private final int type; 14 private final int type;
15 - private final Object data; 15 + private final byte[] data;
16 16
17 /** 17 /**
18 * Creates an immutable TLV message. 18 * Creates an immutable TLV message.
19 * 19 *
20 * @param type message type 20 * @param type message type
21 - * @param length message length 21 + * @param data message data bytes
22 - * @param data message data
23 */ 22 */
24 - public TLVMessage(int type, int length, Object data) { 23 + public TLVMessage(int type, byte[] data) {
25 - this.length = length; 24 + this.length = data.length + TLVMessageStream.METADATA_LENGTH;
26 this.type = type; 25 this.type = type;
27 this.data = data; 26 this.data = data;
28 } 27 }
...@@ -37,11 +36,11 @@ public class TLVMessage extends AbstractMessage { ...@@ -37,11 +36,11 @@ public class TLVMessage extends AbstractMessage {
37 } 36 }
38 37
39 /** 38 /**
40 - * Returns the data object. 39 + * Returns the data bytes.
41 * 40 *
42 * @return message data 41 * @return message data
43 */ 42 */
44 - public Object data() { 43 + public byte[] data() {
45 return data; 44 return data;
46 } 45 }
47 46
......
1 -package org.onlab.onos.ccc; 1 +package org.onlab.onos.store.cluster.impl;
2 2
3 import org.onlab.nio.IOLoop; 3 import org.onlab.nio.IOLoop;
4 import org.onlab.nio.MessageStream; 4 import org.onlab.nio.MessageStream;
5 +import org.onlab.onos.cluster.DefaultControllerNode;
5 6
6 import java.nio.ByteBuffer; 7 import java.nio.ByteBuffer;
7 import java.nio.channels.ByteChannel; 8 import java.nio.channels.ByteChannel;
...@@ -13,8 +14,13 @@ import static com.google.common.base.Preconditions.checkState; ...@@ -13,8 +14,13 @@ import static com.google.common.base.Preconditions.checkState;
13 */ 14 */
14 public class TLVMessageStream extends MessageStream<TLVMessage> { 15 public class TLVMessageStream extends MessageStream<TLVMessage> {
15 16
17 + public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
18 +
19 + private static final int LENGTH_OFFSET = 12;
16 private static final long MARKER = 0xfeedcafecafefeedL; 20 private static final long MARKER = 0xfeedcafecafefeedL;
17 21
22 + private DefaultControllerNode node;
23 +
18 /** 24 /**
19 * Creates a message stream associated with the specified IO loop and 25 * Creates a message stream associated with the specified IO loop and
20 * backed by the given byte channel. 26 * backed by the given byte channel.
...@@ -29,17 +35,51 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { ...@@ -29,17 +35,51 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
29 super(loop, byteChannel, bufferSize, maxIdleMillis); 35 super(loop, byteChannel, bufferSize, maxIdleMillis);
30 } 36 }
31 37
38 + /**
39 + * Returns the node with which this stream is associated.
40 + *
41 + * @return controller node
42 + */
43 + DefaultControllerNode node() {
44 + return node;
45 + }
46 +
47 + /**
48 + * Sets the node with which this stream is affiliated.
49 + *
50 + * @param node controller node
51 + */
52 + void setNode(DefaultControllerNode node) {
53 + checkState(this.node == null, "Stream is already bound to a node");
54 + this.node = node;
55 + }
56 +
32 @Override 57 @Override
33 protected TLVMessage read(ByteBuffer buffer) { 58 protected TLVMessage read(ByteBuffer buffer) {
59 + // Do we have enough bytes to read the header? If not, bail.
60 + if (buffer.remaining() < METADATA_LENGTH) {
61 + return null;
62 + }
63 +
64 + // Peek at the length and if we have enough to read the entire message
65 + // go ahead, otherwise bail.
66 + int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
67 + if (buffer.remaining() < length) {
68 + return null;
69 + }
70 +
71 + // At this point, we have enough data to read a complete message.
34 long marker = buffer.getLong(); 72 long marker = buffer.getLong();
35 checkState(marker == MARKER, "Incorrect message marker"); 73 checkState(marker == MARKER, "Incorrect message marker");
36 74
37 int type = buffer.getInt(); 75 int type = buffer.getInt();
38 - int length = buffer.getInt(); 76 + length = buffer.getInt();
39 77
40 // TODO: add deserialization hook here 78 // TODO: add deserialization hook here
79 + byte[] data = new byte[length - METADATA_LENGTH];
80 + buffer.get(data);
41 81
42 - return new TLVMessage(type, length, null); 82 + return new TLVMessage(type, data);
43 } 83 }
44 84
45 @Override 85 @Override
...@@ -49,5 +89,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { ...@@ -49,5 +89,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
49 buffer.putInt(message.length()); 89 buffer.putInt(message.length());
50 90
51 // TODO: add serialization hook here 91 // TODO: add serialization hook here
92 + buffer.put(message.data());
52 } 93 }
94 +
53 } 95 }
......
...@@ -48,14 +48,11 @@ ...@@ -48,14 +48,11 @@
48 description="ONOS core components"> 48 description="ONOS core components">
49 <feature>onos-api</feature> 49 <feature>onos-api</feature>
50 <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> 50 <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
51 - <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle> 51 + <bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
52 - <bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
53 - <bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
54 - <bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
55 </feature> 52 </feature>
56 53
57 - <feature name="onos-core-dist" version="1.0.0" 54 + <feature name="onos-core-hazelcast" version="1.0.0"
58 - description="ONOS core components"> 55 + description="ONOS core components built on hazelcast">
59 <feature>onos-api</feature> 56 <feature>onos-api</feature>
60 <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle> 57 <bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
61 <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle> 58 <bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
......
...@@ -170,7 +170,7 @@ public abstract class MessageStream<M extends Message> { ...@@ -170,7 +170,7 @@ public abstract class MessageStream<M extends Message> {
170 } 170 }
171 171
172 /** 172 /**
173 - * Reads, withouth blocking, a list of messages from the stream. 173 + * Reads, without blocking, a list of messages from the stream.
174 * The list will be empty if there were not messages pending. 174 * The list will be empty if there were not messages pending.
175 * 175 *
176 * @return list of messages or null if backing channel has been closed 176 * @return list of messages or null if backing channel has been closed
......