tom

Integrated Kryo serializers with the communications manager and IO loop stuff.

...@@ -29,4 +29,8 @@ public interface ClusterCommunicationAdminService { ...@@ -29,4 +29,8 @@ public interface ClusterCommunicationAdminService {
29 */ 29 */
30 void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate); 30 void startUp(DefaultControllerNode localNode, ClusterNodesDelegate delegate);
31 31
32 + /**
33 + * Clears all nodes and streams as part of leaving the cluster.
34 + */
35 + void clearAllNodesAndStreams();
32 } 36 }
......
...@@ -13,6 +13,7 @@ import org.onlab.onos.cluster.DefaultControllerNode; ...@@ -13,6 +13,7 @@ import org.onlab.onos.cluster.DefaultControllerNode;
13 import org.onlab.onos.cluster.NodeId; 13 import org.onlab.onos.cluster.NodeId;
14 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; 14 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
15 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 15 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
16 +import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
16 import org.onlab.onos.store.cluster.messaging.HelloMessage; 17 import org.onlab.onos.store.cluster.messaging.HelloMessage;
17 import org.onlab.onos.store.cluster.messaging.MessageSubject; 18 import org.onlab.onos.store.cluster.messaging.MessageSubject;
18 import org.onlab.onos.store.cluster.messaging.MessageSubscriber; 19 import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
...@@ -83,9 +84,11 @@ public class ClusterCommunicationManager ...@@ -83,9 +84,11 @@ public class ClusterCommunicationManager
83 84
84 private final Timer timer = new Timer("onos-comm-initiator"); 85 private final Timer timer = new Timer("onos-comm-initiator");
85 private final TimerTask connectionCustodian = new ConnectionCustodian(); 86 private final TimerTask connectionCustodian = new ConnectionCustodian();
87 + private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
86 88
87 @Activate 89 @Activate
88 public void activate() { 90 public void activate() {
91 + addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
89 log.info("Activated but waiting for delegate"); 92 log.info("Activated but waiting for delegate");
90 } 93 }
91 94
...@@ -102,9 +105,20 @@ public class ClusterCommunicationManager ...@@ -102,9 +105,20 @@ public class ClusterCommunicationManager
102 } 105 }
103 106
104 @Override 107 @Override
108 + public boolean send(ClusterMessage message) {
109 + boolean ok = true;
110 + for (DefaultControllerNode node : nodes) {
111 + if (!node.equals(localNode)) {
112 + ok = send(message, node.id()) && ok;
113 + }
114 + }
115 + return ok;
116 + }
117 +
118 + @Override
105 public boolean send(ClusterMessage message, NodeId toNodeId) { 119 public boolean send(ClusterMessage message, NodeId toNodeId) {
106 ClusterMessageStream stream = streams.get(toNodeId); 120 ClusterMessageStream stream = streams.get(toNodeId);
107 - if (stream != null) { 121 + if (stream != null && !toNodeId.equals(localNode.id())) {
108 try { 122 try {
109 stream.write(message); 123 stream.write(message);
110 return true; 124 return true;
...@@ -140,6 +154,7 @@ public class ClusterCommunicationManager ...@@ -140,6 +154,7 @@ public class ClusterCommunicationManager
140 154
141 @Override 155 @Override
142 public void removeNode(DefaultControllerNode node) { 156 public void removeNode(DefaultControllerNode node) {
157 + send(new GoodbyeMessage(node.id()));
143 nodes.remove(node); 158 nodes.remove(node);
144 ClusterMessageStream stream = streams.remove(node.id()); 159 ClusterMessageStream stream = streams.remove(node.id());
145 if (stream != null) { 160 if (stream != null) {
...@@ -159,6 +174,16 @@ public class ClusterCommunicationManager ...@@ -159,6 +174,16 @@ public class ClusterCommunicationManager
159 log.info("Started"); 174 log.info("Started");
160 } 175 }
161 176
177 + @Override
178 + public void clearAllNodesAndStreams() {
179 + nodes.clear();
180 + send(new GoodbyeMessage(localNode.id()));
181 + for (ClusterMessageStream stream : streams.values()) {
182 + stream.close();
183 + }
184 + streams.clear();
185 + }
186 +
162 /** 187 /**
163 * Dispatches the specified message to all subscribers to its subject. 188 * Dispatches the specified message to all subscribers to its subject.
164 * 189 *
...@@ -304,4 +329,11 @@ public class ClusterCommunicationManager ...@@ -304,4 +329,11 @@ public class ClusterCommunicationManager
304 } 329 }
305 } 330 }
306 331
332 + private class GoodbyeSubscriber implements MessageSubscriber {
333 + @Override
334 + public void receive(ClusterMessage message, NodeId fromNodeId) {
335 + log.info("Received goodbye message from {}", fromNodeId);
336 + nodesDelegate.nodeRemoved(fromNodeId);
337 + }
338 + }
307 } 339 }
......
...@@ -27,4 +27,11 @@ public interface ClusterNodesDelegate { ...@@ -27,4 +27,11 @@ public interface ClusterNodesDelegate {
27 */ 27 */
28 void nodeVanished(NodeId nodeId); 28 void nodeVanished(NodeId nodeId);
29 29
30 + /**
31 + * Notifies about remote request to remove node from cluster.
32 + *
33 + * @param nodeId identifier of the cluster node that was removed
34 + */
35 + void nodeRemoved(NodeId nodeId);
36 +
30 } 37 }
......
...@@ -127,9 +127,17 @@ public class DistributedClusterStore ...@@ -127,9 +127,17 @@ public class DistributedClusterStore
127 127
128 @Override 128 @Override
129 public void removeNode(NodeId nodeId) { 129 public void removeNode(NodeId nodeId) {
130 - DefaultControllerNode node = nodes.remove(nodeId); 130 + if (nodeId.equals(localNode.id())) {
131 - if (node != null) { 131 + // FIXME: this is still broken
132 - communicationAdminService.removeNode(node); 132 + // We are being ejected from the cluster, so remove all other nodes.
133 + communicationAdminService.clearAllNodesAndStreams();
134 + nodes.clear();
135 + } else {
136 + // Remove the other node.
137 + DefaultControllerNode node = nodes.remove(nodeId);
138 + if (node != null) {
139 + communicationAdminService.removeNode(node);
140 + }
133 } 141 }
134 } 142 }
135 143
...@@ -148,6 +156,11 @@ public class DistributedClusterStore ...@@ -148,6 +156,11 @@ public class DistributedClusterStore
148 public void nodeVanished(NodeId nodeId) { 156 public void nodeVanished(NodeId nodeId) {
149 states.put(nodeId, State.INACTIVE); 157 states.put(nodeId, State.INACTIVE);
150 } 158 }
159 +
160 + @Override
161 + public void nodeRemoved(NodeId nodeId) {
162 + removeNode(nodeId);
163 + }
151 } 164 }
152 165
153 } 166 }
......
1 package org.onlab.onos.store.cluster.impl; 1 package org.onlab.onos.store.cluster.impl;
2 2
3 +import de.javakaffee.kryoserializers.URISerializer;
4 +import org.apache.felix.scr.annotations.Activate;
3 import org.apache.felix.scr.annotations.Component; 5 import org.apache.felix.scr.annotations.Component;
6 +import org.apache.felix.scr.annotations.Deactivate;
4 import org.apache.felix.scr.annotations.Service; 7 import org.apache.felix.scr.annotations.Service;
8 +import org.onlab.onos.cluster.ControllerNode;
9 +import org.onlab.onos.cluster.DefaultControllerNode;
5 import org.onlab.onos.cluster.NodeId; 10 import org.onlab.onos.cluster.NodeId;
11 +import org.onlab.onos.net.ConnectPoint;
12 +import org.onlab.onos.net.DefaultDevice;
13 +import org.onlab.onos.net.DefaultLink;
14 +import org.onlab.onos.net.DefaultPort;
15 +import org.onlab.onos.net.Device;
16 +import org.onlab.onos.net.DeviceId;
17 +import org.onlab.onos.net.Element;
18 +import org.onlab.onos.net.Link;
19 +import org.onlab.onos.net.LinkKey;
20 +import org.onlab.onos.net.MastershipRole;
21 +import org.onlab.onos.net.Port;
22 +import org.onlab.onos.net.PortNumber;
23 +import org.onlab.onos.net.provider.ProviderId;
6 import org.onlab.onos.store.cluster.messaging.ClusterMessage; 24 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
25 +import org.onlab.onos.store.cluster.messaging.EchoMessage;
26 +import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
7 import org.onlab.onos.store.cluster.messaging.HelloMessage; 27 import org.onlab.onos.store.cluster.messaging.HelloMessage;
8 import org.onlab.onos.store.cluster.messaging.MessageSubject; 28 import org.onlab.onos.store.cluster.messaging.MessageSubject;
9 import org.onlab.onos.store.cluster.messaging.SerializationService; 29 import org.onlab.onos.store.cluster.messaging.SerializationService;
30 +import org.onlab.onos.store.serializers.ConnectPointSerializer;
31 +import org.onlab.onos.store.serializers.DefaultLinkSerializer;
32 +import org.onlab.onos.store.serializers.DefaultPortSerializer;
33 +import org.onlab.onos.store.serializers.DeviceIdSerializer;
34 +import org.onlab.onos.store.serializers.IpPrefixSerializer;
35 +import org.onlab.onos.store.serializers.LinkKeySerializer;
36 +import org.onlab.onos.store.serializers.NodeIdSerializer;
37 +import org.onlab.onos.store.serializers.PortNumberSerializer;
38 +import org.onlab.onos.store.serializers.ProviderIdSerializer;
10 import org.onlab.packet.IpPrefix; 39 import org.onlab.packet.IpPrefix;
40 +import org.onlab.util.KryoPool;
11 import org.slf4j.Logger; 41 import org.slf4j.Logger;
12 import org.slf4j.LoggerFactory; 42 import org.slf4j.LoggerFactory;
13 43
44 +import java.net.URI;
14 import java.nio.ByteBuffer; 45 import java.nio.ByteBuffer;
46 +import java.util.ArrayList;
47 +import java.util.HashMap;
15 48
16 import static com.google.common.base.Preconditions.checkState; 49 import static com.google.common.base.Preconditions.checkState;
17 50
...@@ -24,11 +57,64 @@ public class MessageSerializer implements SerializationService { ...@@ -24,11 +57,64 @@ public class MessageSerializer implements SerializationService {
24 57
25 private final Logger log = LoggerFactory.getLogger(getClass()); 58 private final Logger log = LoggerFactory.getLogger(getClass());
26 59
27 - private static final int METADATA_LENGTH = 16; // 8 + 4 + 4 60 + private static final int METADATA_LENGTH = 12; // 8 + 4
28 - private static final int LENGTH_OFFSET = 12; 61 + private static final int LENGTH_OFFSET = 8;
29 62
30 private static final long MARKER = 0xfeedcafebeaddeadL; 63 private static final long MARKER = 0xfeedcafebeaddeadL;
31 64
65 + private KryoPool serializerPool;
66 +
67 + @Activate
68 + public void activate() {
69 + setupKryoPool();
70 + log.info("Started");
71 + }
72 +
73 + @Deactivate
74 + public void deactivate() {
75 + log.info("Stopped");
76 + }
77 +
78 + /**
79 + * Sets up the common serialzers pool.
80 + */
81 + protected void setupKryoPool() {
82 + // FIXME Slice out types used in common to separate pool/namespace.
83 + serializerPool = KryoPool.newBuilder()
84 + .register(ArrayList.class,
85 + HashMap.class,
86 +
87 + ControllerNode.State.class,
88 + Device.Type.class,
89 +
90 + DefaultControllerNode.class,
91 + DefaultDevice.class,
92 + MastershipRole.class,
93 + Port.class,
94 + Element.class,
95 +
96 + Link.Type.class,
97 +
98 + MessageSubject.class,
99 + HelloMessage.class,
100 + GoodbyeMessage.class,
101 + EchoMessage.class
102 + )
103 + .register(IpPrefix.class, new IpPrefixSerializer())
104 + .register(URI.class, new URISerializer())
105 + .register(NodeId.class, new NodeIdSerializer())
106 + .register(ProviderId.class, new ProviderIdSerializer())
107 + .register(DeviceId.class, new DeviceIdSerializer())
108 + .register(PortNumber.class, new PortNumberSerializer())
109 + .register(DefaultPort.class, new DefaultPortSerializer())
110 + .register(LinkKey.class, new LinkKeySerializer())
111 + .register(ConnectPoint.class, new ConnectPointSerializer())
112 + .register(DefaultLink.class, new DefaultLinkSerializer())
113 + .build()
114 + .populate(1);
115 + }
116 +
117 +
32 @Override 118 @Override
33 public ClusterMessage decode(ByteBuffer buffer) { 119 public ClusterMessage decode(ByteBuffer buffer) {
34 try { 120 try {
...@@ -47,18 +133,12 @@ public class MessageSerializer implements SerializationService { ...@@ -47,18 +133,12 @@ public class MessageSerializer implements SerializationService {
47 // At this point, we have enough data to read a complete message. 133 // At this point, we have enough data to read a complete message.
48 long marker = buffer.getLong(); 134 long marker = buffer.getLong();
49 checkState(marker == MARKER, "Incorrect message marker"); 135 checkState(marker == MARKER, "Incorrect message marker");
50 -
51 - int subjectOrdinal = buffer.getInt();
52 - MessageSubject subject = MessageSubject.values()[subjectOrdinal];
53 length = buffer.getInt(); 136 length = buffer.getInt();
54 137
55 // TODO: sanity checking for length 138 // TODO: sanity checking for length
56 byte[] data = new byte[length - METADATA_LENGTH]; 139 byte[] data = new byte[length - METADATA_LENGTH];
57 buffer.get(data); 140 buffer.get(data);
58 - 141 + return (ClusterMessage) serializerPool.deserialize(data);
59 - // TODO: add deserialization hook here; for now this hack
60 - String[] fields = new String(data).split(":");
61 - return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
62 142
63 } catch (Exception e) { 143 } catch (Exception e) {
64 // TODO: recover from exceptions by forwarding stream to next marker 144 // TODO: recover from exceptions by forwarding stream to next marker
...@@ -70,12 +150,8 @@ public class MessageSerializer implements SerializationService { ...@@ -70,12 +150,8 @@ public class MessageSerializer implements SerializationService {
70 @Override 150 @Override
71 public void encode(ClusterMessage message, ByteBuffer buffer) { 151 public void encode(ClusterMessage message, ByteBuffer buffer) {
72 try { 152 try {
73 - HelloMessage helloMessage = (HelloMessage) message; 153 + byte[] data = serializerPool.serialize(message);
74 buffer.putLong(MARKER); 154 buffer.putLong(MARKER);
75 - buffer.putInt(message.subject().ordinal());
76 -
77 - String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
78 - byte[] data = str.getBytes();
79 buffer.putInt(data.length + METADATA_LENGTH); 155 buffer.putInt(data.length + METADATA_LENGTH);
80 buffer.put(data); 156 buffer.put(data);
81 157
......
...@@ -10,6 +10,15 @@ import java.util.Set; ...@@ -10,6 +10,15 @@ import java.util.Set;
10 public interface ClusterCommunicationService { 10 public interface ClusterCommunicationService {
11 11
12 /** 12 /**
13 + * Sends a message to all controller nodes.
14 + *
15 + * @param message message to send
16 + * @return true if the message was sent sucessfully to all nodes; false
17 + * if there is no stream or if there was an error for some node
18 + */
19 + boolean send(ClusterMessage message);
20 +
21 + /**
13 * Sends a message to the specified controller node. 22 * Sends a message to the specified controller node.
14 * 23 *
15 * @param message message to send 24 * @param message message to send
......
1 +package org.onlab.onos.store.cluster.messaging;
2 +
3 +import org.onlab.onos.cluster.NodeId;
4 +
5 +/**
6 + * Goodbye message that nodes use to leave the cluster for good.
7 + */
8 +public class GoodbyeMessage extends ClusterMessage {
9 +
10 + private NodeId nodeId;
11 +
12 + // For serialization
13 + private GoodbyeMessage() {
14 + super(MessageSubject.GOODBYE);
15 + nodeId = null;
16 + }
17 +
18 + /**
19 + * Creates a new goodbye message.
20 + *
21 + * @param nodeId sending node identification
22 + */
23 + public GoodbyeMessage(NodeId nodeId) {
24 + super(MessageSubject.HELLO);
25 + this.nodeId = nodeId;
26 + }
27 +
28 + /**
29 + * Returns the sending node identifer.
30 + *
31 + * @return node identifier
32 + */
33 + public NodeId nodeId() {
34 + return nodeId;
35 + }
36 +
37 +}
...@@ -8,6 +8,9 @@ public enum MessageSubject { ...@@ -8,6 +8,9 @@ public enum MessageSubject {
8 /** Represents a first greeting message. */ 8 /** Represents a first greeting message. */
9 HELLO, 9 HELLO,
10 10
11 + /** Signifies node's intent to leave the cluster. */
12 + GOODBYE,
13 +
11 /** Signifies a heart-beat message. */ 14 /** Signifies a heart-beat message. */
12 ECHO 15 ECHO
13 16
......