tom

Preparing for change in ClusterService/Store implementation.

1 +package org.onlab.onos.ccc;
2 +
3 +import com.google.common.collect.ImmutableSet;
4 +import org.apache.felix.scr.annotations.Activate;
5 +import org.apache.felix.scr.annotations.Component;
6 +import org.apache.felix.scr.annotations.Deactivate;
7 +import org.apache.felix.scr.annotations.Service;
8 +import org.onlab.nio.AcceptorLoop;
9 +import org.onlab.nio.IOLoop;
10 +import org.onlab.nio.MessageStream;
11 +import org.onlab.onos.cluster.ClusterEvent;
12 +import org.onlab.onos.cluster.ClusterStore;
13 +import org.onlab.onos.cluster.ClusterStoreDelegate;
14 +import org.onlab.onos.cluster.ControllerNode;
15 +import org.onlab.onos.cluster.DefaultControllerNode;
16 +import org.onlab.onos.cluster.NodeId;
17 +import org.onlab.onos.store.AbstractStore;
18 +import org.onlab.packet.IpPrefix;
19 +import org.slf4j.Logger;
20 +import org.slf4j.LoggerFactory;
21 +
22 +import java.io.IOException;
23 +import java.net.InetSocketAddress;
24 +import java.nio.channels.ByteChannel;
25 +import java.nio.channels.ServerSocketChannel;
26 +import java.util.ArrayList;
27 +import java.util.List;
28 +import java.util.Map;
29 +import java.util.Set;
30 +import java.util.concurrent.ConcurrentHashMap;
31 +import java.util.concurrent.ExecutorService;
32 +import java.util.concurrent.Executors;
33 +
34 +import static java.net.InetAddress.getByAddress;
35 +import static org.onlab.onos.cluster.ControllerNode.State;
36 +import static org.onlab.packet.IpPrefix.valueOf;
37 +import static org.onlab.util.Tools.namedThreads;
38 +
39 +/**
40 + * Distributed implementation of the cluster nodes store.
41 + */
42 +@Component(immediate = true)
43 +@Service
44 +public class DistributedClusterStore
45 + extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
46 + implements ClusterStore {
47 +
48 + private final Logger log = LoggerFactory.getLogger(getClass());
49 +
50 + private static final long SELECT_TIMEOUT = 50;
51 + private static final int WORKERS = 3;
52 + private static final int COMM_BUFFER_SIZE = 16 * 1024;
53 + private static final int COMM_IDLE_TIME = 500;
54 +
55 + private DefaultControllerNode self;
56 + private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
57 + private final Map<NodeId, State> states = new ConcurrentHashMap<>();
58 +
59 + private final ExecutorService listenExecutor =
60 + Executors.newSingleThreadExecutor(namedThreads("onos-listen"));
61 + private final ExecutorService commExecutors =
62 + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-cluster"));
63 + private final ExecutorService heartbeatExecutor =
64 + Executors.newSingleThreadExecutor(namedThreads("onos-heartbeat"));
65 +
66 + private ListenLoop listenLoop;
67 + private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
68 +
69 + @Activate
70 + public void activate() {
71 + establishIdentity();
72 + startCommunications();
73 + startListening();
74 + log.info("Started");
75 + }
76 +
77 + private void startCommunications() {
78 + for (int i = 0; i < WORKERS; i++) {
79 + try {
80 + CommLoop loop = new CommLoop();
81 + commLoops.add(loop);
82 + commExecutors.execute(loop);
83 + } catch (IOException e) {
84 + log.warn("Unable to start comm IO loop", e);
85 + }
86 + }
87 + }
88 +
89 + // Starts listening for connections from peer cluster members.
90 + private void startListening() {
91 + try {
92 + listenLoop = new ListenLoop(self.ip(), self.tcpPort());
93 + listenExecutor.execute(listenLoop);
94 + } catch (IOException e) {
95 + log.error("Unable to listen for cluster connections", e);
96 + }
97 + }
98 +
99 + // Establishes the controller's own identity.
100 + private void establishIdentity() {
101 + // For now rely on env. variable.
102 + IpPrefix ip = valueOf(System.getenv("ONOS_NIC"));
103 + self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
104 + }
105 +
106 + @Deactivate
107 + public void deactivate() {
108 + listenLoop.shutdown();
109 + for (CommLoop loop : commLoops) {
110 + loop.shutdown();
111 + }
112 + log.info("Stopped");
113 + }
114 +
115 + @Override
116 + public ControllerNode getLocalNode() {
117 + return self;
118 + }
119 +
120 + @Override
121 + public Set<ControllerNode> getNodes() {
122 + ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
123 + return builder.addAll(nodes.values()).build();
124 + }
125 +
126 + @Override
127 + public ControllerNode getNode(NodeId nodeId) {
128 + return nodes.get(nodeId);
129 + }
130 +
131 + @Override
132 + public State getState(NodeId nodeId) {
133 + State state = states.get(nodeId);
134 + return state == null ? State.INACTIVE : state;
135 + }
136 +
137 + @Override
138 + public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
139 + DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
140 + nodes.put(nodeId, node);
141 + return node;
142 + }
143 +
144 + @Override
145 + public void removeNode(NodeId nodeId) {
146 + nodes.remove(nodeId);
147 + }
148 +
149 + // Listens and accepts inbound connections from other cluster nodes.
150 + private class ListenLoop extends AcceptorLoop {
151 + ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
152 + super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
153 + }
154 +
155 + @Override
156 + protected void acceptConnection(ServerSocketChannel channel) throws IOException {
157 +
158 + }
159 + }
160 +
161 + private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
162 + CommLoop() throws IOException {
163 + super(SELECT_TIMEOUT);
164 + }
165 +
166 + @Override
167 + protected TLVMessageStream createStream(ByteChannel byteChannel) {
168 + return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
169 + }
170 +
171 + @Override
172 + protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
173 +
174 + }
175 + }
176 +}
1 +package org.onlab.onos.ccc;
2 +
3 +import org.onlab.nio.AbstractMessage;
4 +
5 +import java.util.Objects;
6 +
7 +import static com.google.common.base.MoreObjects.toStringHelper;
8 +
9 +/**
10 + * Base message for cluster-wide communications using TLVs.
11 + */
12 +public class TLVMessage extends AbstractMessage {
13 +
14 + private final int type;
15 + private final Object data;
16 +
17 + /**
18 + * Creates an immutable TLV message.
19 + *
20 + * @param type message type
21 + * @param length message length
22 + * @param data message data
23 + */
24 + public TLVMessage(int type, int length, Object data) {
25 + this.length = length;
26 + this.type = type;
27 + this.data = data;
28 + }
29 +
30 + /**
31 + * Returns the message type indicator.
32 + *
33 + * @return message type
34 + */
35 + public int type() {
36 + return type;
37 + }
38 +
39 + /**
40 + * Returns the data object.
41 + *
42 + * @return message data
43 + */
44 + public Object data() {
45 + return data;
46 + }
47 +
48 + @Override
49 + public int hashCode() {
50 + return Objects.hash(type, data);
51 + }
52 +
53 + @Override
54 + public boolean equals(Object obj) {
55 + if (this == obj) {
56 + return true;
57 + }
58 + if (obj == null || getClass() != obj.getClass()) {
59 + return false;
60 + }
61 + final TLVMessage other = (TLVMessage) obj;
62 + return Objects.equals(this.type, other.type) &&
63 + Objects.equals(this.data, other.data);
64 + }
65 +
66 + @Override
67 + public String toString() {
68 + return toStringHelper(this).add("type", type).add("length", length).toString();
69 + }
70 +
71 +}
1 +package org.onlab.onos.ccc;
2 +
3 +import org.onlab.nio.IOLoop;
4 +import org.onlab.nio.MessageStream;
5 +
6 +import java.nio.ByteBuffer;
7 +import java.nio.channels.ByteChannel;
8 +
9 +import static com.google.common.base.Preconditions.checkState;
10 +
11 +/**
12 + * Stream for transferring TLV messages between cluster members.
13 + */
14 +public class TLVMessageStream extends MessageStream<TLVMessage> {
15 +
16 + private static final long MARKER = 0xfeedcafecafefeedL;
17 +
18 + /**
19 + * Creates a message stream associated with the specified IO loop and
20 + * backed by the given byte channel.
21 + *
22 + * @param loop IO loop
23 + * @param byteChannel backing byte channel
24 + * @param bufferSize size of the backing byte buffers
25 + * @param maxIdleMillis maximum number of millis the stream can be idle
26 + */
27 + protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
28 + int bufferSize, int maxIdleMillis) {
29 + super(loop, byteChannel, bufferSize, maxIdleMillis);
30 + }
31 +
32 + @Override
33 + protected TLVMessage read(ByteBuffer buffer) {
34 + long marker = buffer.getLong();
35 + checkState(marker == MARKER, "Incorrect message marker");
36 +
37 + int type = buffer.getInt();
38 + int length = buffer.getInt();
39 +
40 + // TODO: add deserialization hook here
41 +
42 + return new TLVMessage(type, length, null);
43 + }
44 +
45 + @Override
46 + protected void write(TLVMessage message, ByteBuffer buffer) {
47 + buffer.putLong(MARKER);
48 + buffer.putInt(message.type());
49 + buffer.putInt(message.length());
50 +
51 + // TODO: add serialization hook here
52 + }
53 +}