Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
Conflicts: apps/pom.xml Change-Id: I5dc2c751086118b5137ea8a0a2b81e9de9d48fac
Showing
29 changed files
with
1194 additions
and
400 deletions
apps/config/pom.xml
0 → 100644
1 | +<?xml version="1.0" encoding="UTF-8"?> | ||
2 | +<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
3 | + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
4 | + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
5 | + <modelVersion>4.0.0</modelVersion> | ||
6 | + | ||
7 | + <parent> | ||
8 | + <groupId>org.onlab.onos</groupId> | ||
9 | + <artifactId>onos-apps</artifactId> | ||
10 | + <version>1.0.0-SNAPSHOT</version> | ||
11 | + <relativePath>../pom.xml</relativePath> | ||
12 | + </parent> | ||
13 | + | ||
14 | + <artifactId>onos-app-config</artifactId> | ||
15 | + <packaging>bundle</packaging> | ||
16 | + | ||
17 | + <description>ONOS simple network configuration reader</description> | ||
18 | + | ||
19 | + <dependencies> | ||
20 | + <dependency> | ||
21 | + <groupId>org.codehaus.jackson</groupId> | ||
22 | + <artifactId>jackson-core-asl</artifactId> | ||
23 | + </dependency> | ||
24 | + <dependency> | ||
25 | + <groupId>org.codehaus.jackson</groupId> | ||
26 | + <artifactId>jackson-mapper-asl</artifactId> | ||
27 | + </dependency> | ||
28 | + <dependency> | ||
29 | + <groupId>com.fasterxml.jackson.core</groupId> | ||
30 | + <artifactId>jackson-annotations</artifactId> | ||
31 | + <version>2.4.2</version> | ||
32 | + <scope>provided</scope> | ||
33 | + </dependency> | ||
34 | + </dependencies> | ||
35 | + | ||
36 | +</project> |
1 | +package org.onlab.onos.config; | ||
2 | + | ||
3 | +import java.util.Collections; | ||
4 | +import java.util.List; | ||
5 | + | ||
6 | +import org.codehaus.jackson.annotate.JsonProperty; | ||
7 | + | ||
8 | +/** | ||
9 | + * Object to store address configuration read from a JSON file. | ||
10 | + */ | ||
11 | +public class AddressConfiguration { | ||
12 | + | ||
13 | + private List<AddressEntry> addresses; | ||
14 | + | ||
15 | + /** | ||
16 | + * Gets a list of addresses in the system. | ||
17 | + * | ||
18 | + * @return the list of addresses | ||
19 | + */ | ||
20 | + public List<AddressEntry> getAddresses() { | ||
21 | + return Collections.unmodifiableList(addresses); | ||
22 | + } | ||
23 | + | ||
24 | + /** | ||
25 | + * Sets a list of addresses in the system. | ||
26 | + * | ||
27 | + * @param addresses the list of addresses | ||
28 | + */ | ||
29 | + @JsonProperty("addresses") | ||
30 | + public void setAddresses(List<AddressEntry> addresses) { | ||
31 | + this.addresses = addresses; | ||
32 | + } | ||
33 | + | ||
34 | +} |
1 | +package org.onlab.onos.config; | ||
2 | + | ||
3 | +import java.util.List; | ||
4 | + | ||
5 | +import org.codehaus.jackson.annotate.JsonProperty; | ||
6 | +import org.onlab.packet.IpPrefix; | ||
7 | +import org.onlab.packet.MacAddress; | ||
8 | + | ||
9 | +/** | ||
10 | + * Represents a set of addresses bound to a port. | ||
11 | + */ | ||
12 | +public class AddressEntry { | ||
13 | + private String dpid; | ||
14 | + private short portNumber; | ||
15 | + private List<IpPrefix> ipAddresses; | ||
16 | + private MacAddress macAddress; | ||
17 | + | ||
18 | + public String getDpid() { | ||
19 | + return dpid; | ||
20 | + } | ||
21 | + | ||
22 | + @JsonProperty("dpid") | ||
23 | + public void setDpid(String strDpid) { | ||
24 | + this.dpid = strDpid; | ||
25 | + } | ||
26 | + | ||
27 | + public short getPortNumber() { | ||
28 | + return portNumber; | ||
29 | + } | ||
30 | + | ||
31 | + @JsonProperty("port") | ||
32 | + public void setPortNumber(short portNumber) { | ||
33 | + this.portNumber = portNumber; | ||
34 | + } | ||
35 | + | ||
36 | + public List<IpPrefix> getIpAddresses() { | ||
37 | + return ipAddresses; | ||
38 | + } | ||
39 | + | ||
40 | + @JsonProperty("ips") | ||
41 | + public void setIpAddresses(List<IpPrefix> ipAddresses) { | ||
42 | + this.ipAddresses = ipAddresses; | ||
43 | + } | ||
44 | + | ||
45 | + public MacAddress getMacAddress() { | ||
46 | + return macAddress; | ||
47 | + } | ||
48 | + | ||
49 | + @JsonProperty("mac") | ||
50 | + public void setMacAddress(MacAddress macAddress) { | ||
51 | + this.macAddress = macAddress; | ||
52 | + } | ||
53 | +} |
1 | +package org.onlab.onos.config; | ||
2 | + | ||
3 | +import static org.slf4j.LoggerFactory.getLogger; | ||
4 | + | ||
5 | +import java.io.File; | ||
6 | +import java.io.FileNotFoundException; | ||
7 | +import java.io.IOException; | ||
8 | + | ||
9 | +import org.apache.felix.scr.annotations.Activate; | ||
10 | +import org.apache.felix.scr.annotations.Component; | ||
11 | +import org.apache.felix.scr.annotations.Deactivate; | ||
12 | +import org.apache.felix.scr.annotations.Reference; | ||
13 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
14 | +import org.codehaus.jackson.map.ObjectMapper; | ||
15 | +import org.onlab.onos.net.ConnectPoint; | ||
16 | +import org.onlab.onos.net.DeviceId; | ||
17 | +import org.onlab.onos.net.PortNumber; | ||
18 | +import org.onlab.onos.net.host.HostAdminService; | ||
19 | +import org.onlab.onos.net.host.PortAddresses; | ||
20 | +import org.slf4j.Logger; | ||
21 | + | ||
22 | +import com.google.common.collect.Sets; | ||
23 | + | ||
24 | +/** | ||
25 | + * Simple configuration module to read in supplementary network configuration | ||
26 | + * from a file. | ||
27 | + */ | ||
28 | +@Component(immediate = true) | ||
29 | +public class NetworkConfigReader { | ||
30 | + | ||
31 | + private final Logger log = getLogger(getClass()); | ||
32 | + | ||
33 | + private static final String DEFAULT_CONFIG_FILE = "config/addresses.json"; | ||
34 | + private String configFileName = DEFAULT_CONFIG_FILE; | ||
35 | + | ||
36 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) | ||
37 | + protected HostAdminService hostAdminService; | ||
38 | + | ||
39 | + @Activate | ||
40 | + protected void activate() { | ||
41 | + log.info("Started network config reader"); | ||
42 | + | ||
43 | + log.info("Config file set to {}", configFileName); | ||
44 | + | ||
45 | + AddressConfiguration config = readNetworkConfig(); | ||
46 | + | ||
47 | + if (config != null) { | ||
48 | + for (AddressEntry entry : config.getAddresses()) { | ||
49 | + | ||
50 | + ConnectPoint cp = new ConnectPoint( | ||
51 | + DeviceId.deviceId(dpidToUri(entry.getDpid())), | ||
52 | + PortNumber.portNumber(entry.getPortNumber())); | ||
53 | + | ||
54 | + PortAddresses addresses = new PortAddresses(cp, | ||
55 | + Sets.newHashSet(entry.getIpAddresses()), | ||
56 | + entry.getMacAddress()); | ||
57 | + | ||
58 | + hostAdminService.bindAddressesToPort(addresses); | ||
59 | + } | ||
60 | + } | ||
61 | + } | ||
62 | + | ||
63 | + @Deactivate | ||
64 | + protected void deactivate() { | ||
65 | + log.info("Stopped"); | ||
66 | + } | ||
67 | + | ||
68 | + private AddressConfiguration readNetworkConfig() { | ||
69 | + File configFile = new File(configFileName); | ||
70 | + | ||
71 | + ObjectMapper mapper = new ObjectMapper(); | ||
72 | + | ||
73 | + try { | ||
74 | + AddressConfiguration config = | ||
75 | + mapper.readValue(configFile, AddressConfiguration.class); | ||
76 | + | ||
77 | + return config; | ||
78 | + } catch (FileNotFoundException e) { | ||
79 | + log.warn("Configuration file not found: {}", configFileName); | ||
80 | + } catch (IOException e) { | ||
81 | + log.error("Unable to read config from file:", e); | ||
82 | + } | ||
83 | + | ||
84 | + return null; | ||
85 | + } | ||
86 | + | ||
87 | + private static String dpidToUri(String dpid) { | ||
88 | + return "of:" + dpid.replace(":", ""); | ||
89 | + } | ||
90 | +} |
apps/config/src/main/resources/config.json
0 → 100644
1 | +{ | ||
2 | + "interfaces" : [ | ||
3 | + { | ||
4 | + "dpid" : "00:00:00:00:00:00:01", | ||
5 | + "port" : "1", | ||
6 | + "ips" : ["192.168.10.101/24"], | ||
7 | + "mac" : "00:00:00:11:22:33" | ||
8 | + }, | ||
9 | + { | ||
10 | + "dpid" : "00:00:00:00:00:00:02", | ||
11 | + "port" : "1", | ||
12 | + "ips" : ["192.168.20.101/24", "192.168.30.101/24"] | ||
13 | + }, | ||
14 | + { | ||
15 | + "dpid" : "00:00:00:00:00:00:03", | ||
16 | + "port" : "1", | ||
17 | + "ips" : ["10.1.0.1/16"], | ||
18 | + "mac" : "00:00:00:00:00:01" | ||
19 | + } | ||
20 | + ] | ||
21 | +} |
... | @@ -9,6 +9,8 @@ import org.onlab.onos.net.ConnectPoint; | ... | @@ -9,6 +9,8 @@ import org.onlab.onos.net.ConnectPoint; |
9 | import org.onlab.packet.IpPrefix; | 9 | import org.onlab.packet.IpPrefix; |
10 | import org.onlab.packet.MacAddress; | 10 | import org.onlab.packet.MacAddress; |
11 | 11 | ||
12 | +import com.google.common.base.MoreObjects; | ||
13 | + | ||
12 | /** | 14 | /** |
13 | * Represents address information bound to a port. | 15 | * Represents address information bound to a port. |
14 | */ | 16 | */ |
... | @@ -83,4 +85,13 @@ public class PortAddresses { | ... | @@ -83,4 +85,13 @@ public class PortAddresses { |
83 | public int hashCode() { | 85 | public int hashCode() { |
84 | return Objects.hash(connectPoint, ipAddresses, macAddress); | 86 | return Objects.hash(connectPoint, ipAddresses, macAddress); |
85 | } | 87 | } |
88 | + | ||
89 | + @Override | ||
90 | + public String toString() { | ||
91 | + return MoreObjects.toStringHelper(getClass()) | ||
92 | + .add("connect-point", connectPoint) | ||
93 | + .add("ip-addresses", ipAddresses) | ||
94 | + .add("mac-address", macAddress) | ||
95 | + .toString(); | ||
96 | + } | ||
86 | } | 97 | } | ... | ... |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterConnectionListener.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import org.onlab.nio.AcceptorLoop; | ||
4 | +import org.onlab.packet.IpPrefix; | ||
5 | + | ||
6 | +import java.io.IOException; | ||
7 | +import java.net.InetSocketAddress; | ||
8 | +import java.net.Socket; | ||
9 | +import java.nio.channels.ServerSocketChannel; | ||
10 | +import java.nio.channels.SocketChannel; | ||
11 | + | ||
12 | +import static java.net.InetAddress.getByAddress; | ||
13 | + | ||
14 | +/** | ||
15 | + * Listens to inbound connection requests and accepts them. | ||
16 | + */ | ||
17 | +public class ClusterConnectionListener extends AcceptorLoop { | ||
18 | + | ||
19 | + private static final long SELECT_TIMEOUT = 50; | ||
20 | + private static final int COMM_BUFFER_SIZE = 32 * 1024; | ||
21 | + | ||
22 | + private static final boolean SO_NO_DELAY = false; | ||
23 | + private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE; | ||
24 | + private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE; | ||
25 | + | ||
26 | + private final WorkerFinder workerFinder; | ||
27 | + | ||
28 | + ClusterConnectionListener(IpPrefix ip, int tcpPort, | ||
29 | + WorkerFinder workerFinder) throws IOException { | ||
30 | + super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort)); | ||
31 | + this.workerFinder = workerFinder; | ||
32 | + } | ||
33 | + | ||
34 | + @Override | ||
35 | + protected void acceptConnection(ServerSocketChannel channel) throws IOException { | ||
36 | + SocketChannel sc = channel.accept(); | ||
37 | + sc.configureBlocking(false); | ||
38 | + | ||
39 | + Socket so = sc.socket(); | ||
40 | + so.setTcpNoDelay(SO_NO_DELAY); | ||
41 | + so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); | ||
42 | + so.setSendBufferSize(SO_SEND_BUFFER_SIZE); | ||
43 | + | ||
44 | + workerFinder.findWorker().acceptStream(sc); | ||
45 | + } | ||
46 | + | ||
47 | +} |
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import org.onlab.nio.IOLoop; | ||
4 | +import org.onlab.nio.MessageStream; | ||
5 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
6 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
7 | +import org.onlab.onos.store.cluster.messaging.ClusterMessageStream; | ||
8 | +import org.onlab.onos.store.cluster.messaging.SerializationService; | ||
9 | +import org.slf4j.Logger; | ||
10 | +import org.slf4j.LoggerFactory; | ||
11 | + | ||
12 | +import java.io.IOException; | ||
13 | +import java.net.InetSocketAddress; | ||
14 | +import java.nio.channels.ByteChannel; | ||
15 | +import java.nio.channels.SelectionKey; | ||
16 | +import java.nio.channels.SocketChannel; | ||
17 | +import java.util.List; | ||
18 | +import java.util.Objects; | ||
19 | + | ||
20 | +import static org.onlab.packet.IpPrefix.valueOf; | ||
21 | + | ||
22 | +/** | ||
23 | + * Performs the IO operations related to a cluster-wide communications. | ||
24 | + */ | ||
25 | +public class ClusterIOWorker extends | ||
26 | + IOLoop<ClusterMessage, ClusterMessageStream> { | ||
27 | + | ||
28 | + private final Logger log = LoggerFactory.getLogger(getClass()); | ||
29 | + | ||
30 | + private static final long SELECT_TIMEOUT = 50; | ||
31 | + | ||
32 | + private final ConnectionManager connectionManager; | ||
33 | + private final CommunicationsDelegate commsDelegate; | ||
34 | + private final SerializationService serializationService; | ||
35 | + private final ClusterMessage helloMessage; | ||
36 | + | ||
37 | + /** | ||
38 | + * Creates a new cluster IO worker. | ||
39 | + * | ||
40 | + * @param connectionManager parent connection manager | ||
41 | + * @param commsDelegate communications delegate for dispatching | ||
42 | + * @param serializationService serialization service for encode/decode | ||
43 | + * @param helloMessage hello message for greeting peers | ||
44 | + * @throws IOException if errors occur during IO loop ignition | ||
45 | + */ | ||
46 | + ClusterIOWorker(ConnectionManager connectionManager, | ||
47 | + CommunicationsDelegate commsDelegate, | ||
48 | + SerializationService serializationService, | ||
49 | + ClusterMessage helloMessage) throws IOException { | ||
50 | + super(SELECT_TIMEOUT); | ||
51 | + this.connectionManager = connectionManager; | ||
52 | + this.commsDelegate = commsDelegate; | ||
53 | + this.serializationService = serializationService; | ||
54 | + this.helloMessage = helloMessage; | ||
55 | + } | ||
56 | + | ||
57 | + @Override | ||
58 | + protected ClusterMessageStream createStream(ByteChannel byteChannel) { | ||
59 | + return new ClusterMessageStream(serializationService, this, byteChannel); | ||
60 | + } | ||
61 | + | ||
62 | + @Override | ||
63 | + protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) { | ||
64 | + for (ClusterMessage message : messages) { | ||
65 | + commsDelegate.dispatch(message); | ||
66 | + } | ||
67 | + } | ||
68 | + | ||
69 | + @Override | ||
70 | + public ClusterMessageStream acceptStream(SocketChannel channel) { | ||
71 | + ClusterMessageStream stream = super.acceptStream(channel); | ||
72 | + try { | ||
73 | + InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress(); | ||
74 | + log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress())); | ||
75 | + stream.write(helloMessage); | ||
76 | + | ||
77 | + } catch (IOException e) { | ||
78 | + log.warn("Unable to accept connection from an unknown end-point", e); | ||
79 | + } | ||
80 | + return stream; | ||
81 | + } | ||
82 | + | ||
83 | + @Override | ||
84 | + protected void connect(SelectionKey key) throws IOException { | ||
85 | + try { | ||
86 | + super.connect(key); | ||
87 | + ClusterMessageStream stream = (ClusterMessageStream) key.attachment(); | ||
88 | + stream.write(helloMessage); | ||
89 | + | ||
90 | + } catch (IOException e) { | ||
91 | + if (!Objects.equals(e.getMessage(), "Connection refused")) { | ||
92 | + throw e; | ||
93 | + } | ||
94 | + } | ||
95 | + } | ||
96 | + | ||
97 | + @Override | ||
98 | + protected void removeStream(MessageStream<ClusterMessage> stream) { | ||
99 | + DefaultControllerNode node = ((ClusterMessageStream) stream).node(); | ||
100 | + if (node != null) { | ||
101 | + log.info("Closed connection to node {}", node.id()); | ||
102 | + connectionManager.removeNodeStream(node); | ||
103 | + } | ||
104 | + super.removeStream(stream); | ||
105 | + } | ||
106 | + | ||
107 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterNodesDelegate.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
4 | + | ||
5 | +/** | ||
6 | + * Simple back interface through which connection manager can interact with | ||
7 | + * the cluster store. | ||
8 | + */ | ||
9 | +public interface ClusterNodesDelegate { | ||
10 | + | ||
11 | + /** | ||
12 | + * Notifies about a new cluster node being detected. | ||
13 | + * | ||
14 | + * @param node newly detected cluster node | ||
15 | + */ | ||
16 | + void nodeDetected(DefaultControllerNode node); | ||
17 | + | ||
18 | + /** | ||
19 | + * Notifies about cluster node going offline. | ||
20 | + * | ||
21 | + * @param node cluster node that vanished | ||
22 | + */ | ||
23 | + void nodeVanished(DefaultControllerNode node); | ||
24 | + | ||
25 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/CommunicationsDelegate.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
4 | + | ||
5 | +/** | ||
6 | + * Simple back interface for interacting with the communications service. | ||
7 | + */ | ||
8 | +public interface CommunicationsDelegate { | ||
9 | + | ||
10 | + /** | ||
11 | + * Dispatches the specified message to all registered subscribers. | ||
12 | + * | ||
13 | + * @param message message to be dispatched | ||
14 | + */ | ||
15 | + void dispatch(ClusterMessage message); | ||
16 | + | ||
17 | + /** | ||
18 | + * Sets the sender. | ||
19 | + * | ||
20 | + * @param messageSender message sender | ||
21 | + */ | ||
22 | + void setSender(MessageSender messageSender); | ||
23 | + | ||
24 | +} |
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.cluster.DefaultControllerNode; | ||
4 | +import org.onlab.onos.cluster.NodeId; | ||
5 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
6 | +import org.onlab.onos.store.cluster.messaging.ClusterMessageStream; | ||
7 | +import org.onlab.onos.store.cluster.messaging.HelloMessage; | ||
8 | +import org.onlab.onos.store.cluster.messaging.SerializationService; | ||
9 | +import org.slf4j.Logger; | ||
10 | +import org.slf4j.LoggerFactory; | ||
11 | + | ||
12 | +import java.io.IOException; | ||
13 | +import java.net.InetSocketAddress; | ||
14 | +import java.net.SocketAddress; | ||
15 | +import java.nio.channels.SocketChannel; | ||
16 | +import java.util.ArrayList; | ||
17 | +import java.util.HashSet; | ||
18 | +import java.util.List; | ||
19 | +import java.util.Map; | ||
20 | +import java.util.Set; | ||
21 | +import java.util.Timer; | ||
22 | +import java.util.TimerTask; | ||
23 | +import java.util.concurrent.ConcurrentHashMap; | ||
24 | +import java.util.concurrent.ExecutorService; | ||
25 | +import java.util.concurrent.Executors; | ||
26 | + | ||
27 | +import static java.net.InetAddress.getByAddress; | ||
28 | +import static org.onlab.util.Tools.namedThreads; | ||
29 | + | ||
30 | +/** | ||
31 | + * Manages connections to other controller cluster nodes. | ||
32 | + */ | ||
33 | +public class ConnectionManager implements MessageSender { | ||
34 | + | ||
35 | + private final Logger log = LoggerFactory.getLogger(getClass()); | ||
36 | + | ||
37 | + private static final long CONNECTION_CUSTODIAN_DELAY = 1000L; | ||
38 | + private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000; | ||
39 | + | ||
40 | + private static final long START_TIMEOUT = 1000; | ||
41 | + private static final int WORKERS = 3; | ||
42 | + | ||
43 | + private ClusterConnectionListener connectionListener; | ||
44 | + private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS); | ||
45 | + | ||
46 | + private final DefaultControllerNode localNode; | ||
47 | + private final ClusterNodesDelegate nodesDelegate; | ||
48 | + private final CommunicationsDelegate commsDelegate; | ||
49 | + private final SerializationService serializationService; | ||
50 | + | ||
51 | + // Nodes to be monitored to make sure they have a connection. | ||
52 | + private final Set<DefaultControllerNode> nodes = new HashSet<>(); | ||
53 | + | ||
54 | + // Means to track message streams to other nodes. | ||
55 | + private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>(); | ||
56 | + | ||
57 | + // Executor pools for listening and managing connections to other nodes. | ||
58 | + private final ExecutorService listenExecutor = | ||
59 | + Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen")); | ||
60 | + private final ExecutorService commExecutors = | ||
61 | + Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster")); | ||
62 | + private final ExecutorService heartbeatExecutor = | ||
63 | + Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat")); | ||
64 | + | ||
65 | + private final Timer timer = new Timer("onos-comm-initiator"); | ||
66 | + private final TimerTask connectionCustodian = new ConnectionCustodian(); | ||
67 | + | ||
68 | + private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder(); | ||
69 | + | ||
70 | + | ||
71 | + /** | ||
72 | + * Creates a new connection manager. | ||
73 | + */ | ||
74 | + ConnectionManager(DefaultControllerNode localNode, | ||
75 | + ClusterNodesDelegate nodesDelegate, | ||
76 | + CommunicationsDelegate commsDelegate, | ||
77 | + SerializationService serializationService) { | ||
78 | + this.localNode = localNode; | ||
79 | + this.nodesDelegate = nodesDelegate; | ||
80 | + this.commsDelegate = commsDelegate; | ||
81 | + this.serializationService = serializationService; | ||
82 | + | ||
83 | + commsDelegate.setSender(this); | ||
84 | + startCommunications(); | ||
85 | + startListening(); | ||
86 | + startInitiating(); | ||
87 | + log.info("Started"); | ||
88 | + } | ||
89 | + | ||
90 | + /** | ||
91 | + * Shuts down the connection manager. | ||
92 | + */ | ||
93 | + void shutdown() { | ||
94 | + connectionListener.shutdown(); | ||
95 | + for (ClusterIOWorker worker : workers) { | ||
96 | + worker.shutdown(); | ||
97 | + } | ||
98 | + log.info("Stopped"); | ||
99 | + } | ||
100 | + | ||
101 | + /** | ||
102 | + * Adds the node to the list of monitored nodes. | ||
103 | + * | ||
104 | + * @param node node to be added | ||
105 | + */ | ||
106 | + void addNode(DefaultControllerNode node) { | ||
107 | + nodes.add(node); | ||
108 | + } | ||
109 | + | ||
110 | + /** | ||
111 | + * Removes the node from the list of monitored nodes. | ||
112 | + * | ||
113 | + * @param node node to be removed | ||
114 | + */ | ||
115 | + void removeNode(DefaultControllerNode node) { | ||
116 | + nodes.remove(node); | ||
117 | + ClusterMessageStream stream = streams.remove(node.id()); | ||
118 | + if (stream != null) { | ||
119 | + stream.close(); | ||
120 | + } | ||
121 | + } | ||
122 | + | ||
123 | + /** | ||
124 | + * Removes the stream associated with the specified node. | ||
125 | + * | ||
126 | + * @param node node whose stream to remove | ||
127 | + */ | ||
128 | + void removeNodeStream(DefaultControllerNode node) { | ||
129 | + nodesDelegate.nodeVanished(node); | ||
130 | + streams.remove(node.id()); | ||
131 | + } | ||
132 | + | ||
133 | + @Override | ||
134 | + public boolean send(NodeId nodeId, ClusterMessage message) { | ||
135 | + ClusterMessageStream stream = streams.get(nodeId); | ||
136 | + if (stream != null) { | ||
137 | + try { | ||
138 | + stream.write(message); | ||
139 | + return true; | ||
140 | + } catch (IOException e) { | ||
141 | + log.warn("Unable to send a message about {} to node {}", | ||
142 | + message.subject(), nodeId); | ||
143 | + } | ||
144 | + } | ||
145 | + return false; | ||
146 | + } | ||
147 | + | ||
148 | + /** | ||
149 | + * Kicks off the IO loops and waits for them to startup. | ||
150 | + */ | ||
151 | + private void startCommunications() { | ||
152 | + HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(), | ||
153 | + localNode.tcpPort()); | ||
154 | + for (int i = 0; i < WORKERS; i++) { | ||
155 | + try { | ||
156 | + ClusterIOWorker worker = | ||
157 | + new ClusterIOWorker(this, commsDelegate, | ||
158 | + serializationService, hello); | ||
159 | + workers.add(worker); | ||
160 | + commExecutors.execute(worker); | ||
161 | + } catch (IOException e) { | ||
162 | + log.warn("Unable to start communication worker", e); | ||
163 | + } | ||
164 | + } | ||
165 | + | ||
166 | + // Wait for the IO loops to start | ||
167 | + for (ClusterIOWorker loop : workers) { | ||
168 | + if (!loop.awaitStart(START_TIMEOUT)) { | ||
169 | + log.warn("Comm loop did not start on-time; moving on..."); | ||
170 | + } | ||
171 | + } | ||
172 | + } | ||
173 | + | ||
174 | + /** | ||
175 | + * Starts listening for connections from peer cluster members. | ||
176 | + */ | ||
177 | + private void startListening() { | ||
178 | + try { | ||
179 | + connectionListener = | ||
180 | + new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(), | ||
181 | + workerFinder); | ||
182 | + listenExecutor.execute(connectionListener); | ||
183 | + if (!connectionListener.awaitStart(START_TIMEOUT)) { | ||
184 | + log.warn("Listener did not start on-time; moving on..."); | ||
185 | + } | ||
186 | + } catch (IOException e) { | ||
187 | + log.error("Unable to listen for cluster connections", e); | ||
188 | + } | ||
189 | + } | ||
190 | + | ||
191 | + /** | ||
192 | + * Initiates open connection request and registers the pending socket | ||
193 | + * channel with the given IO loop. | ||
194 | + * | ||
195 | + * @param loop loop with which the channel should be registered | ||
196 | + * @throws java.io.IOException if the socket could not be open or connected | ||
197 | + */ | ||
198 | + private void initiateConnection(DefaultControllerNode node, | ||
199 | + ClusterIOWorker loop) throws IOException { | ||
200 | + SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort()); | ||
201 | + SocketChannel ch = SocketChannel.open(); | ||
202 | + ch.configureBlocking(false); | ||
203 | + ch.connect(sa); | ||
204 | + loop.connectStream(ch); | ||
205 | + } | ||
206 | + | ||
207 | + | ||
208 | + /** | ||
209 | + * Attempts to connect to any nodes that do not have an associated connection. | ||
210 | + */ | ||
211 | + private void startInitiating() { | ||
212 | + timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, | ||
213 | + CONNECTION_CUSTODIAN_FREQUENCY); | ||
214 | + } | ||
215 | + | ||
216 | + // Sweeps through all controller nodes and attempts to open connection to | ||
217 | + // those that presently do not have one. | ||
218 | + private class ConnectionCustodian extends TimerTask { | ||
219 | + @Override | ||
220 | + public void run() { | ||
221 | + for (DefaultControllerNode node : nodes) { | ||
222 | + if (node != localNode && !streams.containsKey(node.id())) { | ||
223 | + try { | ||
224 | + initiateConnection(node, workerFinder.findWorker()); | ||
225 | + } catch (IOException e) { | ||
226 | + log.debug("Unable to connect", e); | ||
227 | + } | ||
228 | + } | ||
229 | + } | ||
230 | + } | ||
231 | + } | ||
232 | + | ||
233 | + // Finds the least utilitied IO loop. | ||
234 | + private class LeastUtilitiedWorkerFinder implements WorkerFinder { | ||
235 | + | ||
236 | + @Override | ||
237 | + public ClusterIOWorker findWorker() { | ||
238 | + ClusterIOWorker leastUtilized = null; | ||
239 | + int minCount = Integer.MAX_VALUE; | ||
240 | + for (ClusterIOWorker worker : workers) { | ||
241 | + int count = worker.streamCount(); | ||
242 | + if (count == 0) { | ||
243 | + return worker; | ||
244 | + } | ||
245 | + | ||
246 | + if (count < minCount) { | ||
247 | + leastUtilized = worker; | ||
248 | + minCount = count; | ||
249 | + } | ||
250 | + } | ||
251 | + return leastUtilized; | ||
252 | + } | ||
253 | + } | ||
254 | + | ||
255 | +} |
... | @@ -4,10 +4,9 @@ import com.google.common.collect.ImmutableSet; | ... | @@ -4,10 +4,9 @@ import com.google.common.collect.ImmutableSet; |
4 | import org.apache.felix.scr.annotations.Activate; | 4 | import org.apache.felix.scr.annotations.Activate; |
5 | import org.apache.felix.scr.annotations.Component; | 5 | import org.apache.felix.scr.annotations.Component; |
6 | import org.apache.felix.scr.annotations.Deactivate; | 6 | import org.apache.felix.scr.annotations.Deactivate; |
7 | +import org.apache.felix.scr.annotations.Reference; | ||
8 | +import org.apache.felix.scr.annotations.ReferenceCardinality; | ||
7 | import org.apache.felix.scr.annotations.Service; | 9 | import org.apache.felix.scr.annotations.Service; |
8 | -import org.onlab.nio.AcceptorLoop; | ||
9 | -import org.onlab.nio.IOLoop; | ||
10 | -import org.onlab.nio.MessageStream; | ||
11 | import org.onlab.onos.cluster.ClusterEvent; | 10 | import org.onlab.onos.cluster.ClusterEvent; |
12 | import org.onlab.onos.cluster.ClusterStore; | 11 | import org.onlab.onos.cluster.ClusterStore; |
13 | import org.onlab.onos.cluster.ClusterStoreDelegate; | 12 | import org.onlab.onos.cluster.ClusterStoreDelegate; |
... | @@ -15,33 +14,18 @@ import org.onlab.onos.cluster.ControllerNode; | ... | @@ -15,33 +14,18 @@ import org.onlab.onos.cluster.ControllerNode; |
15 | import org.onlab.onos.cluster.DefaultControllerNode; | 14 | import org.onlab.onos.cluster.DefaultControllerNode; |
16 | import org.onlab.onos.cluster.NodeId; | 15 | import org.onlab.onos.cluster.NodeId; |
17 | import org.onlab.onos.store.AbstractStore; | 16 | import org.onlab.onos.store.AbstractStore; |
17 | +import org.onlab.onos.store.cluster.messaging.SerializationService; | ||
18 | import org.onlab.packet.IpPrefix; | 18 | import org.onlab.packet.IpPrefix; |
19 | import org.slf4j.Logger; | 19 | import org.slf4j.Logger; |
20 | import org.slf4j.LoggerFactory; | 20 | import org.slf4j.LoggerFactory; |
21 | 21 | ||
22 | import java.io.IOException; | 22 | import java.io.IOException; |
23 | -import java.net.InetSocketAddress; | ||
24 | -import java.net.Socket; | ||
25 | -import java.net.SocketAddress; | ||
26 | -import java.nio.channels.ByteChannel; | ||
27 | -import java.nio.channels.SelectionKey; | ||
28 | -import java.nio.channels.ServerSocketChannel; | ||
29 | -import java.nio.channels.SocketChannel; | ||
30 | -import java.util.ArrayList; | ||
31 | -import java.util.List; | ||
32 | import java.util.Map; | 23 | import java.util.Map; |
33 | -import java.util.Objects; | ||
34 | import java.util.Set; | 24 | import java.util.Set; |
35 | -import java.util.Timer; | ||
36 | -import java.util.TimerTask; | ||
37 | import java.util.concurrent.ConcurrentHashMap; | 25 | import java.util.concurrent.ConcurrentHashMap; |
38 | -import java.util.concurrent.ExecutorService; | ||
39 | -import java.util.concurrent.Executors; | ||
40 | 26 | ||
41 | -import static java.net.InetAddress.getByAddress; | ||
42 | import static org.onlab.onos.cluster.ControllerNode.State; | 27 | import static org.onlab.onos.cluster.ControllerNode.State; |
43 | import static org.onlab.packet.IpPrefix.valueOf; | 28 | import static org.onlab.packet.IpPrefix.valueOf; |
44 | -import static org.onlab.util.Tools.namedThreads; | ||
45 | 29 | ||
46 | /** | 30 | /** |
47 | * Distributed implementation of the cluster nodes store. | 31 | * Distributed implementation of the cluster nodes store. |
... | @@ -52,146 +36,69 @@ public class DistributedClusterStore | ... | @@ -52,146 +36,69 @@ public class DistributedClusterStore |
52 | extends AbstractStore<ClusterEvent, ClusterStoreDelegate> | 36 | extends AbstractStore<ClusterEvent, ClusterStoreDelegate> |
53 | implements ClusterStore { | 37 | implements ClusterStore { |
54 | 38 | ||
55 | - private static final int HELLO_MSG = 1; | ||
56 | - private static final int ECHO_MSG = 2; | ||
57 | - | ||
58 | private final Logger log = LoggerFactory.getLogger(getClass()); | 39 | private final Logger log = LoggerFactory.getLogger(getClass()); |
59 | 40 | ||
60 | - private static final long CONNECTION_CUSTODIAN_DELAY = 1000L; | 41 | + private DefaultControllerNode localNode; |
61 | - private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000; | ||
62 | - | ||
63 | - private static final long START_TIMEOUT = 1000; | ||
64 | - private static final long SELECT_TIMEOUT = 50; | ||
65 | - private static final int WORKERS = 3; | ||
66 | - private static final int COMM_BUFFER_SIZE = 32 * 1024; | ||
67 | - private static final int COMM_IDLE_TIME = 500; | ||
68 | - | ||
69 | - private static final boolean SO_NO_DELAY = false; | ||
70 | - private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE; | ||
71 | - private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE; | ||
72 | - | ||
73 | - private DefaultControllerNode self; | ||
74 | private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); | 42 | private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); |
75 | private final Map<NodeId, State> states = new ConcurrentHashMap<>(); | 43 | private final Map<NodeId, State> states = new ConcurrentHashMap<>(); |
76 | 44 | ||
77 | - // Means to track message streams to other nodes. | 45 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
78 | - private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>(); | 46 | + private CommunicationsDelegate commsDelegate; |
79 | - private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>(); | ||
80 | - | ||
81 | - // Executor pools for listening and managing connections to other nodes. | ||
82 | - private final ExecutorService listenExecutor = | ||
83 | - Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen")); | ||
84 | - private final ExecutorService commExecutors = | ||
85 | - Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster")); | ||
86 | - private final ExecutorService heartbeatExecutor = | ||
87 | - Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat")); | ||
88 | 47 | ||
89 | - private final Timer timer = new Timer("onos-comm-initiator"); | 48 | + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
90 | - private final TimerTask connectionCustodian = new ConnectionCustodian(); | 49 | + private SerializationService serializationService; |
91 | 50 | ||
92 | - private ListenLoop listenLoop; | 51 | + private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate(); |
93 | - private List<CommLoop> commLoops = new ArrayList<>(WORKERS); | 52 | + private ConnectionManager connectionManager; |
94 | 53 | ||
95 | @Activate | 54 | @Activate |
96 | public void activate() { | 55 | public void activate() { |
97 | loadClusterDefinition(); | 56 | loadClusterDefinition(); |
98 | - startCommunications(); | 57 | + establishSelfIdentity(); |
99 | - startListening(); | 58 | + connectionManager = new ConnectionManager(localNode, nodesDelegate, |
100 | - startInitiating(); | 59 | + commsDelegate, serializationService); |
101 | log.info("Started"); | 60 | log.info("Started"); |
102 | } | 61 | } |
103 | 62 | ||
104 | @Deactivate | 63 | @Deactivate |
105 | public void deactivate() { | 64 | public void deactivate() { |
106 | - listenLoop.shutdown(); | ||
107 | - for (CommLoop loop : commLoops) { | ||
108 | - loop.shutdown(); | ||
109 | - } | ||
110 | log.info("Stopped"); | 65 | log.info("Stopped"); |
111 | } | 66 | } |
112 | 67 | ||
113 | - // Loads the cluster definition file | 68 | + /** |
69 | + * Loads the cluster definition file. | ||
70 | + */ | ||
114 | private void loadClusterDefinition() { | 71 | private void loadClusterDefinition() { |
115 | -// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json"); | 72 | + ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json"); |
116 | -// try { | ||
117 | -// Set<DefaultControllerNode> storedNodes = cds.read(); | ||
118 | -// for (DefaultControllerNode node : storedNodes) { | ||
119 | -// nodes.put(node.id(), node); | ||
120 | -// } | ||
121 | -// } catch (IOException e) { | ||
122 | -// log.error("Unable to read cluster definitions", e); | ||
123 | -// } | ||
124 | - | ||
125 | - // Establishes the controller's own identity. | ||
126 | - IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1")); | ||
127 | - self = nodes.get(new NodeId(ip.toString())); | ||
128 | - | ||
129 | - // As a fall-back, let's make sure we at least know who we are. | ||
130 | - if (self == null) { | ||
131 | - self = new DefaultControllerNode(new NodeId(ip.toString()), ip); | ||
132 | - nodes.put(self.id(), self); | ||
133 | - states.put(self.id(), State.ACTIVE); | ||
134 | - } | ||
135 | - } | ||
136 | - | ||
137 | - // Kicks off the IO loops. | ||
138 | - private void startCommunications() { | ||
139 | - for (int i = 0; i < WORKERS; i++) { | ||
140 | - try { | ||
141 | - CommLoop loop = new CommLoop(); | ||
142 | - commLoops.add(loop); | ||
143 | - commExecutors.execute(loop); | ||
144 | - } catch (IOException e) { | ||
145 | - log.warn("Unable to start comm IO loop", e); | ||
146 | - } | ||
147 | - } | ||
148 | - | ||
149 | - // Wait for the IO loops to start | ||
150 | - for (CommLoop loop : commLoops) { | ||
151 | - if (!loop.awaitStart(START_TIMEOUT)) { | ||
152 | - log.warn("Comm loop did not start on-time; moving on..."); | ||
153 | - } | ||
154 | - } | ||
155 | - } | ||
156 | - | ||
157 | - // Starts listening for connections from peer cluster members. | ||
158 | - private void startListening() { | ||
159 | try { | 73 | try { |
160 | - listenLoop = new ListenLoop(self.ip(), self.tcpPort()); | 74 | + Set<DefaultControllerNode> storedNodes = cds.read(); |
161 | - listenExecutor.execute(listenLoop); | 75 | + for (DefaultControllerNode node : storedNodes) { |
162 | - if (!listenLoop.awaitStart(START_TIMEOUT)) { | 76 | + nodes.put(node.id(), node); |
163 | - log.warn("Listen loop did not start on-time; moving on..."); | ||
164 | } | 77 | } |
165 | } catch (IOException e) { | 78 | } catch (IOException e) { |
166 | - log.error("Unable to listen for cluster connections", e); | 79 | + log.error("Unable to read cluster definitions", e); |
167 | } | 80 | } |
168 | } | 81 | } |
169 | 82 | ||
170 | /** | 83 | /** |
171 | - * Initiates open connection request and registers the pending socket | 84 | + * Determines who the local controller node is. |
172 | - * channel with the given IO loop. | ||
173 | - * | ||
174 | - * @param loop loop with which the channel should be registered | ||
175 | - * @throws java.io.IOException if the socket could not be open or connected | ||
176 | */ | 85 | */ |
177 | - private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException { | 86 | + private void establishSelfIdentity() { |
178 | - SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort()); | 87 | + // Establishes the controller's own identity. |
179 | - SocketChannel ch = SocketChannel.open(); | 88 | + IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1")); |
180 | - nodesByChannel.put(ch, node); | 89 | + localNode = nodes.get(new NodeId(ip.toString())); |
181 | - ch.configureBlocking(false); | ||
182 | - ch.connect(sa); | ||
183 | - loop.connectStream(ch); | ||
184 | - } | ||
185 | - | ||
186 | 90 | ||
187 | - // Attempts to connect to any nodes that do not have an associated connection. | 91 | + // As a fall-back, let's make sure we at least know who we are. |
188 | - private void startInitiating() { | 92 | + if (localNode == null) { |
189 | - timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY); | 93 | + localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip); |
94 | + nodes.put(localNode.id(), localNode); | ||
95 | + states.put(localNode.id(), State.ACTIVE); | ||
96 | + } | ||
190 | } | 97 | } |
191 | 98 | ||
192 | @Override | 99 | @Override |
193 | public ControllerNode getLocalNode() { | 100 | public ControllerNode getLocalNode() { |
194 | - return self; | 101 | + return localNode; |
195 | } | 102 | } |
196 | 103 | ||
197 | @Override | 104 | @Override |
... | @@ -215,179 +122,29 @@ public class DistributedClusterStore | ... | @@ -215,179 +122,29 @@ public class DistributedClusterStore |
215 | public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { | 122 | public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { |
216 | DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort); | 123 | DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort); |
217 | nodes.put(nodeId, node); | 124 | nodes.put(nodeId, node); |
125 | + connectionManager.addNode(node); | ||
218 | return node; | 126 | return node; |
219 | } | 127 | } |
220 | 128 | ||
221 | @Override | 129 | @Override |
222 | public void removeNode(NodeId nodeId) { | 130 | public void removeNode(NodeId nodeId) { |
223 | - nodes.remove(nodeId); | 131 | + DefaultControllerNode node = nodes.remove(nodeId); |
224 | - TLVMessageStream stream = streams.remove(nodeId); | ||
225 | - if (stream != null) { | ||
226 | - stream.close(); | ||
227 | - } | ||
228 | - } | ||
229 | - | ||
230 | - // Listens and accepts inbound connections from other cluster nodes. | ||
231 | - private class ListenLoop extends AcceptorLoop { | ||
232 | - ListenLoop(IpPrefix ip, int tcpPort) throws IOException { | ||
233 | - super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort)); | ||
234 | - } | ||
235 | - | ||
236 | - @Override | ||
237 | - protected void acceptConnection(ServerSocketChannel channel) throws IOException { | ||
238 | - SocketChannel sc = channel.accept(); | ||
239 | - sc.configureBlocking(false); | ||
240 | - | ||
241 | - Socket so = sc.socket(); | ||
242 | - so.setTcpNoDelay(SO_NO_DELAY); | ||
243 | - so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE); | ||
244 | - so.setSendBufferSize(SO_SEND_BUFFER_SIZE); | ||
245 | - | ||
246 | - findLeastUtilizedLoop().acceptStream(sc); | ||
247 | - } | ||
248 | - } | ||
249 | - | ||
250 | - private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> { | ||
251 | - CommLoop() throws IOException { | ||
252 | - super(SELECT_TIMEOUT); | ||
253 | - } | ||
254 | - | ||
255 | - @Override | ||
256 | - protected TLVMessageStream createStream(ByteChannel byteChannel) { | ||
257 | - return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME); | ||
258 | - } | ||
259 | - | ||
260 | - @Override | ||
261 | - protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) { | ||
262 | - TLVMessageStream tlvStream = (TLVMessageStream) stream; | ||
263 | - for (TLVMessage message : messages) { | ||
264 | - // TODO: add type-based dispatching here... this is just a hack to get going | ||
265 | - if (message.type() == HELLO_MSG) { | ||
266 | - processHello(message, tlvStream); | ||
267 | - } else if (message.type() == ECHO_MSG) { | ||
268 | - processEcho(message, tlvStream); | ||
269 | - } else { | ||
270 | - log.info("Deal with other messages"); | ||
271 | - } | ||
272 | - } | ||
273 | - } | ||
274 | - | ||
275 | - @Override | ||
276 | - public TLVMessageStream acceptStream(SocketChannel channel) { | ||
277 | - TLVMessageStream stream = super.acceptStream(channel); | ||
278 | - try { | ||
279 | - InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress(); | ||
280 | - log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress())); | ||
281 | - stream.write(createHello(self)); | ||
282 | - | ||
283 | - } catch (IOException e) { | ||
284 | - log.warn("Unable to accept connection from an unknown end-point", e); | ||
285 | - } | ||
286 | - return stream; | ||
287 | - } | ||
288 | - | ||
289 | - @Override | ||
290 | - public TLVMessageStream connectStream(SocketChannel channel) { | ||
291 | - TLVMessageStream stream = super.connectStream(channel); | ||
292 | - DefaultControllerNode node = nodesByChannel.get(channel); | ||
293 | if (node != null) { | 132 | if (node != null) { |
294 | - log.debug("Opened connection to node {}", node.id()); | 133 | + connectionManager.removeNode(node); |
295 | - nodesByChannel.remove(channel); | ||
296 | - } | ||
297 | - return stream; | ||
298 | - } | ||
299 | - | ||
300 | - @Override | ||
301 | - protected void connect(SelectionKey key) throws IOException { | ||
302 | - try { | ||
303 | - super.connect(key); | ||
304 | - TLVMessageStream stream = (TLVMessageStream) key.attachment(); | ||
305 | - send(stream, createHello(self)); | ||
306 | - } catch (IOException e) { | ||
307 | - if (!Objects.equals(e.getMessage(), "Connection refused")) { | ||
308 | - throw e; | ||
309 | - } | ||
310 | } | 134 | } |
311 | } | 135 | } |
312 | 136 | ||
137 | + // Entity to handle back calls from the connection manager. | ||
138 | + private class InnerNodesDelegate implements ClusterNodesDelegate { | ||
313 | @Override | 139 | @Override |
314 | - protected void removeStream(MessageStream<TLVMessage> stream) { | 140 | + public void nodeDetected(DefaultControllerNode node) { |
315 | - DefaultControllerNode node = ((TLVMessageStream) stream).node(); | ||
316 | - if (node != null) { | ||
317 | - log.info("Closed connection to node {}", node.id()); | ||
318 | - states.put(node.id(), State.INACTIVE); | ||
319 | - streams.remove(node.id()); | ||
320 | - } | ||
321 | - super.removeStream(stream); | ||
322 | - } | ||
323 | - } | ||
324 | - | ||
325 | - // Processes a HELLO message from a peer controller node. | ||
326 | - private void processHello(TLVMessage message, TLVMessageStream stream) { | ||
327 | - // FIXME: pure hack for now | ||
328 | - String data = new String(message.data()); | ||
329 | - String[] fields = data.split(":"); | ||
330 | - DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]), | ||
331 | - valueOf(fields[1]), | ||
332 | - Integer.parseInt(fields[2])); | ||
333 | - stream.setNode(node); | ||
334 | nodes.put(node.id(), node); | 141 | nodes.put(node.id(), node); |
335 | - streams.put(node.id(), stream); | ||
336 | states.put(node.id(), State.ACTIVE); | 142 | states.put(node.id(), State.ACTIVE); |
337 | } | 143 | } |
338 | 144 | ||
339 | - // Processes an ECHO message from a peer controller node. | ||
340 | - private void processEcho(TLVMessage message, TLVMessageStream tlvStream) { | ||
341 | - // TODO: implement heart-beat refresh | ||
342 | - log.info("Dealing with echoes..."); | ||
343 | - } | ||
344 | - | ||
345 | - // Sends message to the specified stream. | ||
346 | - private void send(TLVMessageStream stream, TLVMessage message) { | ||
347 | - try { | ||
348 | - stream.write(message); | ||
349 | - } catch (IOException e) { | ||
350 | - log.warn("Unable to send message to {}", stream.node().id()); | ||
351 | - } | ||
352 | - } | ||
353 | - | ||
354 | - // Creates a hello message to be sent to a peer controller node. | ||
355 | - private TLVMessage createHello(DefaultControllerNode self) { | ||
356 | - return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes()); | ||
357 | - } | ||
358 | - | ||
359 | - // Sweeps through all controller nodes and attempts to open connection to | ||
360 | - // those that presently do not have one. | ||
361 | - private class ConnectionCustodian extends TimerTask { | ||
362 | @Override | 145 | @Override |
363 | - public void run() { | 146 | + public void nodeVanished(DefaultControllerNode node) { |
364 | - for (DefaultControllerNode node : nodes.values()) { | 147 | + states.put(node.id(), State.INACTIVE); |
365 | - if (node != self && !streams.containsKey(node.id())) { | ||
366 | - try { | ||
367 | - openConnection(node, findLeastUtilizedLoop()); | ||
368 | - } catch (IOException e) { | ||
369 | - log.debug("Unable to connect", e); | ||
370 | - } | ||
371 | - } | ||
372 | - } | ||
373 | - } | ||
374 | - } | ||
375 | - | ||
376 | - // Finds the least utilities IO loop. | ||
377 | - private CommLoop findLeastUtilizedLoop() { | ||
378 | - CommLoop leastUtilized = null; | ||
379 | - int minCount = Integer.MAX_VALUE; | ||
380 | - for (CommLoop loop : commLoops) { | ||
381 | - int count = loop.streamCount(); | ||
382 | - if (count == 0) { | ||
383 | - return loop; | ||
384 | - } | ||
385 | - | ||
386 | - if (count < minCount) { | ||
387 | - leastUtilized = loop; | ||
388 | - minCount = count; | ||
389 | - } | ||
390 | } | 148 | } |
391 | - return leastUtilized; | ||
392 | } | 149 | } |
393 | } | 150 | } | ... | ... |
1 | +package org.onlab.onos.store.cluster.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.cluster.NodeId; | ||
4 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
5 | + | ||
6 | +/** | ||
7 | + * Created by tom on 9/29/14. | ||
8 | + */ | ||
9 | +public interface MessageSender { | ||
10 | + | ||
11 | + /** | ||
12 | + * Sends the specified message to the given cluster node. | ||
13 | + * | ||
14 | + * @param nodeId node identifier | ||
15 | + * @param message mesage to send | ||
16 | + * @return true if the message was sent sucessfully; false if there is | ||
17 | + * no stream or if there was an error | ||
18 | + */ | ||
19 | + boolean send(NodeId nodeId, ClusterMessage message); | ||
20 | + | ||
21 | +} |
1 | -package org.onlab.onos.store.cluster.impl; | ||
2 | - | ||
3 | -import org.onlab.nio.AbstractMessage; | ||
4 | - | ||
5 | -import java.util.Objects; | ||
6 | - | ||
7 | -import static com.google.common.base.MoreObjects.toStringHelper; | ||
8 | - | ||
9 | -/** | ||
10 | - * Base message for cluster-wide communications using TLVs. | ||
11 | - */ | ||
12 | -public class TLVMessage extends AbstractMessage { | ||
13 | - | ||
14 | - private final int type; | ||
15 | - private final byte[] data; | ||
16 | - | ||
17 | - /** | ||
18 | - * Creates an immutable TLV message. | ||
19 | - * | ||
20 | - * @param type message type | ||
21 | - * @param data message data bytes | ||
22 | - */ | ||
23 | - public TLVMessage(int type, byte[] data) { | ||
24 | - this.length = data.length + TLVMessageStream.METADATA_LENGTH; | ||
25 | - this.type = type; | ||
26 | - this.data = data; | ||
27 | - } | ||
28 | - | ||
29 | - /** | ||
30 | - * Returns the message type indicator. | ||
31 | - * | ||
32 | - * @return message type | ||
33 | - */ | ||
34 | - public int type() { | ||
35 | - return type; | ||
36 | - } | ||
37 | - | ||
38 | - /** | ||
39 | - * Returns the data bytes. | ||
40 | - * | ||
41 | - * @return message data | ||
42 | - */ | ||
43 | - public byte[] data() { | ||
44 | - return data; | ||
45 | - } | ||
46 | - | ||
47 | - @Override | ||
48 | - public int hashCode() { | ||
49 | - return Objects.hash(type, data); | ||
50 | - } | ||
51 | - | ||
52 | - @Override | ||
53 | - public boolean equals(Object obj) { | ||
54 | - if (this == obj) { | ||
55 | - return true; | ||
56 | - } | ||
57 | - if (obj == null || getClass() != obj.getClass()) { | ||
58 | - return false; | ||
59 | - } | ||
60 | - final TLVMessage other = (TLVMessage) obj; | ||
61 | - return Objects.equals(this.type, other.type) && | ||
62 | - Objects.equals(this.data, other.data); | ||
63 | - } | ||
64 | - | ||
65 | - @Override | ||
66 | - public String toString() { | ||
67 | - return toStringHelper(this).add("type", type).add("length", length).toString(); | ||
68 | - } | ||
69 | - | ||
70 | -} |
1 | +package org.onlab.onos.store.cluster.messaging; | ||
2 | + | ||
3 | +import org.onlab.onos.cluster.NodeId; | ||
4 | + | ||
5 | +import java.util.Set; | ||
6 | + | ||
7 | +/** | ||
8 | + * Service for assisting communications between controller cluster nodes. | ||
9 | + */ | ||
10 | +public interface ClusterCommunicationService { | ||
11 | + | ||
12 | + /** | ||
13 | + * Sends a message to the specified controller node. | ||
14 | + * | ||
15 | + * @param message message to send | ||
16 | + * @param toNodeId node identifier | ||
17 | + * @return true if the message was sent sucessfully; false if there is | ||
18 | + * no stream or if there was an error | ||
19 | + */ | ||
20 | + boolean send(ClusterMessage message, NodeId toNodeId); | ||
21 | + | ||
22 | + /** | ||
23 | + * Adds a new subscriber for the specified message subject. | ||
24 | + * | ||
25 | + * @param subject message subject | ||
26 | + * @param subscriber message subscriber | ||
27 | + */ | ||
28 | + void addSubscriber(MessageSubject subject, MessageSubscriber subscriber); | ||
29 | + | ||
30 | + /** | ||
31 | + * Removes the specified subscriber from the given message subject. | ||
32 | + * | ||
33 | + * @param subject message subject | ||
34 | + * @param subscriber message subscriber | ||
35 | + */ | ||
36 | + void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber); | ||
37 | + | ||
38 | + /** | ||
39 | + * Returns the set of subscribers for the specified message subject. | ||
40 | + * | ||
41 | + * @param subject message subject | ||
42 | + * @return set of message subscribers | ||
43 | + */ | ||
44 | + Set<MessageSubscriber> getSubscribers(MessageSubject subject); | ||
45 | + | ||
46 | +} |
1 | +package org.onlab.onos.store.cluster.messaging; | ||
2 | + | ||
3 | +import org.onlab.nio.AbstractMessage; | ||
4 | + | ||
5 | +import static com.google.common.base.MoreObjects.toStringHelper; | ||
6 | + | ||
7 | +/** | ||
8 | + * Base message for cluster-wide communications. | ||
9 | + */ | ||
10 | +public abstract class ClusterMessage extends AbstractMessage { | ||
11 | + | ||
12 | + private final MessageSubject subject; | ||
13 | + | ||
14 | + /** | ||
15 | + * Creates a cluster message. | ||
16 | + * | ||
17 | + * @param subject message subject | ||
18 | + */ | ||
19 | + protected ClusterMessage(MessageSubject subject) { | ||
20 | + this.subject = subject; | ||
21 | + } | ||
22 | + | ||
23 | + /** | ||
24 | + * Returns the message subject indicator. | ||
25 | + * | ||
26 | + * @return message subject | ||
27 | + */ | ||
28 | + public MessageSubject subject() { | ||
29 | + return subject; | ||
30 | + } | ||
31 | + | ||
32 | + @Override | ||
33 | + public String toString() { | ||
34 | + return toStringHelper(this).add("subject", subject).add("length", length).toString(); | ||
35 | + } | ||
36 | + | ||
37 | +} |
1 | -package org.onlab.onos.store.cluster.impl; | 1 | +package org.onlab.onos.store.cluster.messaging; |
2 | 2 | ||
3 | import org.onlab.nio.IOLoop; | 3 | import org.onlab.nio.IOLoop; |
4 | import org.onlab.nio.MessageStream; | 4 | import org.onlab.nio.MessageStream; |
... | @@ -10,29 +10,29 @@ import java.nio.channels.ByteChannel; | ... | @@ -10,29 +10,29 @@ import java.nio.channels.ByteChannel; |
10 | import static com.google.common.base.Preconditions.checkState; | 10 | import static com.google.common.base.Preconditions.checkState; |
11 | 11 | ||
12 | /** | 12 | /** |
13 | - * Stream for transferring TLV messages between cluster members. | 13 | + * Stream for transferring messages between two cluster members. |
14 | */ | 14 | */ |
15 | -public class TLVMessageStream extends MessageStream<TLVMessage> { | 15 | +public class ClusterMessageStream extends MessageStream<ClusterMessage> { |
16 | 16 | ||
17 | - public static final int METADATA_LENGTH = 16; // 8 + 4 + 4 | 17 | + private static final int COMM_BUFFER_SIZE = 32 * 1024; |
18 | - | 18 | + private static final int COMM_IDLE_TIME = 500; |
19 | - private static final int LENGTH_OFFSET = 12; | ||
20 | - private static final long MARKER = 0xfeedcafecafefeedL; | ||
21 | 19 | ||
22 | private DefaultControllerNode node; | 20 | private DefaultControllerNode node; |
21 | + private SerializationService serializationService; | ||
23 | 22 | ||
24 | /** | 23 | /** |
25 | * Creates a message stream associated with the specified IO loop and | 24 | * Creates a message stream associated with the specified IO loop and |
26 | * backed by the given byte channel. | 25 | * backed by the given byte channel. |
27 | * | 26 | * |
27 | + * @param serializationService service for encoding/decoding messages | ||
28 | * @param loop IO loop | 28 | * @param loop IO loop |
29 | * @param byteChannel backing byte channel | 29 | * @param byteChannel backing byte channel |
30 | - * @param bufferSize size of the backing byte buffers | ||
31 | - * @param maxIdleMillis maximum number of millis the stream can be idle | ||
32 | */ | 30 | */ |
33 | - protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel, | 31 | + public ClusterMessageStream(SerializationService serializationService, |
34 | - int bufferSize, int maxIdleMillis) { | 32 | + IOLoop<ClusterMessage, ?> loop, |
35 | - super(loop, byteChannel, bufferSize, maxIdleMillis); | 33 | + ByteChannel byteChannel) { |
34 | + super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME); | ||
35 | + this.serializationService = serializationService; | ||
36 | } | 36 | } |
37 | 37 | ||
38 | /** | 38 | /** |
... | @@ -40,7 +40,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { | ... | @@ -40,7 +40,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { |
40 | * | 40 | * |
41 | * @return controller node | 41 | * @return controller node |
42 | */ | 42 | */ |
43 | - DefaultControllerNode node() { | 43 | + public DefaultControllerNode node() { |
44 | return node; | 44 | return node; |
45 | } | 45 | } |
46 | 46 | ||
... | @@ -49,47 +49,19 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { | ... | @@ -49,47 +49,19 @@ public class TLVMessageStream extends MessageStream<TLVMessage> { |
49 | * | 49 | * |
50 | * @param node controller node | 50 | * @param node controller node |
51 | */ | 51 | */ |
52 | - void setNode(DefaultControllerNode node) { | 52 | + public void setNode(DefaultControllerNode node) { |
53 | checkState(this.node == null, "Stream is already bound to a node"); | 53 | checkState(this.node == null, "Stream is already bound to a node"); |
54 | this.node = node; | 54 | this.node = node; |
55 | } | 55 | } |
56 | 56 | ||
57 | @Override | 57 | @Override |
58 | - protected TLVMessage read(ByteBuffer buffer) { | 58 | + protected ClusterMessage read(ByteBuffer buffer) { |
59 | - // Do we have enough bytes to read the header? If not, bail. | 59 | + return serializationService.decode(buffer); |
60 | - if (buffer.remaining() < METADATA_LENGTH) { | ||
61 | - return null; | ||
62 | - } | ||
63 | - | ||
64 | - // Peek at the length and if we have enough to read the entire message | ||
65 | - // go ahead, otherwise bail. | ||
66 | - int length = buffer.getInt(buffer.position() + LENGTH_OFFSET); | ||
67 | - if (buffer.remaining() < length) { | ||
68 | - return null; | ||
69 | - } | ||
70 | - | ||
71 | - // At this point, we have enough data to read a complete message. | ||
72 | - long marker = buffer.getLong(); | ||
73 | - checkState(marker == MARKER, "Incorrect message marker"); | ||
74 | - | ||
75 | - int type = buffer.getInt(); | ||
76 | - length = buffer.getInt(); | ||
77 | - | ||
78 | - // TODO: add deserialization hook here | ||
79 | - byte[] data = new byte[length - METADATA_LENGTH]; | ||
80 | - buffer.get(data); | ||
81 | - | ||
82 | - return new TLVMessage(type, data); | ||
83 | } | 60 | } |
84 | 61 | ||
85 | @Override | 62 | @Override |
86 | - protected void write(TLVMessage message, ByteBuffer buffer) { | 63 | + protected void write(ClusterMessage message, ByteBuffer buffer) { |
87 | - buffer.putLong(MARKER); | 64 | + serializationService.encode(message, buffer); |
88 | - buffer.putInt(message.type()); | ||
89 | - buffer.putInt(message.length()); | ||
90 | - | ||
91 | - // TODO: add serialization hook here | ||
92 | - buffer.put(message.data()); | ||
93 | } | 65 | } |
94 | 66 | ||
95 | } | 67 | } | ... | ... |
1 | +package org.onlab.onos.store.cluster.messaging; | ||
2 | + | ||
3 | +import org.onlab.onos.cluster.NodeId; | ||
4 | + | ||
5 | +/**l | ||
6 | + * Echo heart-beat message that nodes send to each other. | ||
7 | + */ | ||
8 | +public class EchoMessage extends ClusterMessage { | ||
9 | + | ||
10 | + private NodeId nodeId; | ||
11 | + | ||
12 | + // For serialization | ||
13 | + private EchoMessage() { | ||
14 | + super(MessageSubject.HELLO); | ||
15 | + nodeId = null; | ||
16 | + } | ||
17 | + | ||
18 | + /** | ||
19 | + * Creates a new heart-beat echo message. | ||
20 | + * | ||
21 | + * @param nodeId sending node identification | ||
22 | + */ | ||
23 | + public EchoMessage(NodeId nodeId) { | ||
24 | + super(MessageSubject.HELLO); | ||
25 | + nodeId = nodeId; | ||
26 | + } | ||
27 | + | ||
28 | + /** | ||
29 | + * Returns the sending node identifer. | ||
30 | + * | ||
31 | + * @return node identifier | ||
32 | + */ | ||
33 | + public NodeId nodeId() { | ||
34 | + return nodeId; | ||
35 | + } | ||
36 | + | ||
37 | +} |
1 | +package org.onlab.onos.store.cluster.messaging; | ||
2 | + | ||
3 | +import org.onlab.onos.cluster.NodeId; | ||
4 | +import org.onlab.packet.IpPrefix; | ||
5 | + | ||
6 | +/** | ||
7 | + * Hello message that nodes use to greet each other. | ||
8 | + */ | ||
9 | +public class HelloMessage extends ClusterMessage { | ||
10 | + | ||
11 | + private NodeId nodeId; | ||
12 | + private IpPrefix ipAddress; | ||
13 | + private int tcpPort; | ||
14 | + | ||
15 | + // For serialization | ||
16 | + private HelloMessage() { | ||
17 | + super(MessageSubject.HELLO); | ||
18 | + nodeId = null; | ||
19 | + ipAddress = null; | ||
20 | + tcpPort = 0; | ||
21 | + } | ||
22 | + | ||
23 | + /** | ||
24 | + * Creates a new hello message for the specified end-point data. | ||
25 | + * | ||
26 | + * @param nodeId sending node identification | ||
27 | + * @param ipAddress sending node IP address | ||
28 | + * @param tcpPort sending node TCP port | ||
29 | + */ | ||
30 | + public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) { | ||
31 | + super(MessageSubject.HELLO); | ||
32 | + nodeId = nodeId; | ||
33 | + ipAddress = ipAddress; | ||
34 | + tcpPort = tcpPort; | ||
35 | + } | ||
36 | + | ||
37 | + /** | ||
38 | + * Returns the sending node identifer. | ||
39 | + * | ||
40 | + * @return node identifier | ||
41 | + */ | ||
42 | + public NodeId nodeId() { | ||
43 | + return nodeId; | ||
44 | + } | ||
45 | + | ||
46 | + /** | ||
47 | + * Returns the sending node IP address. | ||
48 | + * | ||
49 | + * @return node IP address | ||
50 | + */ | ||
51 | + public IpPrefix ipAddress() { | ||
52 | + return ipAddress; | ||
53 | + } | ||
54 | + | ||
55 | + /** | ||
56 | + * Returns the sending node TCP listen port. | ||
57 | + * | ||
58 | + * @return TCP listen port | ||
59 | + */ | ||
60 | + public int tcpPort() { | ||
61 | + return tcpPort; | ||
62 | + } | ||
63 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.messaging; | ||
2 | + | ||
3 | +/** | ||
4 | + * Represents a message consumer. | ||
5 | + */ | ||
6 | +public interface MessageSubscriber { | ||
7 | + | ||
8 | + /** | ||
9 | + * Receives the specified cluster message. | ||
10 | + * | ||
11 | + * @param message message to be received | ||
12 | + */ | ||
13 | + void receive(ClusterMessage message); | ||
14 | + | ||
15 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.messaging; | ||
2 | + | ||
3 | +import java.nio.ByteBuffer; | ||
4 | + | ||
5 | +/** | ||
6 | + * Service for serializing/deserializing intra-cluster messages. | ||
7 | + */ | ||
8 | +public interface SerializationService { | ||
9 | + | ||
10 | + /** | ||
11 | + * Decodes the specified byte buffer to obtain a message within. | ||
12 | + * | ||
13 | + * @param buffer byte buffer with message(s) | ||
14 | + * @return parsed message | ||
15 | + */ | ||
16 | + ClusterMessage decode(ByteBuffer buffer); | ||
17 | + | ||
18 | + /** | ||
19 | + * Encodes the specified message into the given byte buffer. | ||
20 | + * | ||
21 | + * @param message message to be encoded | ||
22 | + * @param buffer byte buffer to receive the message data | ||
23 | + */ | ||
24 | + void encode(ClusterMessage message, ByteBuffer buffer); | ||
25 | + | ||
26 | +} |
1 | +package org.onlab.onos.store.cluster.messaging.impl; | ||
2 | + | ||
3 | +import com.google.common.collect.HashMultimap; | ||
4 | +import com.google.common.collect.ImmutableSet; | ||
5 | +import com.google.common.collect.Multimap; | ||
6 | +import org.apache.felix.scr.annotations.Component; | ||
7 | +import org.apache.felix.scr.annotations.Service; | ||
8 | +import org.onlab.onos.cluster.NodeId; | ||
9 | +import org.onlab.onos.store.cluster.impl.CommunicationsDelegate; | ||
10 | +import org.onlab.onos.store.cluster.impl.MessageSender; | ||
11 | +import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService; | ||
12 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
13 | +import org.onlab.onos.store.cluster.messaging.MessageSubject; | ||
14 | +import org.onlab.onos.store.cluster.messaging.MessageSubscriber; | ||
15 | + | ||
16 | +import java.util.Set; | ||
17 | + | ||
18 | +/** | ||
19 | + * Implements the cluster communication services to use by other stores. | ||
20 | + */ | ||
21 | +@Component(immediate = true) | ||
22 | +@Service | ||
23 | +public class ClusterCommunicationManager | ||
24 | + implements ClusterCommunicationService, CommunicationsDelegate { | ||
25 | + | ||
26 | + // TODO: use something different that won't require synchronization | ||
27 | + private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create(); | ||
28 | + private MessageSender messageSender; | ||
29 | + | ||
30 | + @Override | ||
31 | + public boolean send(ClusterMessage message, NodeId toNodeId) { | ||
32 | + return messageSender.send(toNodeId, message); | ||
33 | + } | ||
34 | + | ||
35 | + @Override | ||
36 | + public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) { | ||
37 | + subscribers.put(subject, subscriber); | ||
38 | + } | ||
39 | + | ||
40 | + @Override | ||
41 | + public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) { | ||
42 | + subscribers.remove(subject, subscriber); | ||
43 | + } | ||
44 | + | ||
45 | + @Override | ||
46 | + public Set<MessageSubscriber> getSubscribers(MessageSubject subject) { | ||
47 | + return ImmutableSet.copyOf(subscribers.get(subject)); | ||
48 | + } | ||
49 | + | ||
50 | + @Override | ||
51 | + public void dispatch(ClusterMessage message) { | ||
52 | + Set<MessageSubscriber> set = getSubscribers(message.subject()); | ||
53 | + if (set != null) { | ||
54 | + for (MessageSubscriber subscriber : set) { | ||
55 | + subscriber.receive(message); | ||
56 | + } | ||
57 | + } | ||
58 | + } | ||
59 | + | ||
60 | + @Override | ||
61 | + public void setSender(MessageSender messageSender) { | ||
62 | + this.messageSender = messageSender; | ||
63 | + } | ||
64 | +} |
core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
0 → 100644
1 | +package org.onlab.onos.store.cluster.messaging.impl; | ||
2 | + | ||
3 | +import org.onlab.onos.store.cluster.messaging.ClusterMessage; | ||
4 | +import org.onlab.onos.store.cluster.messaging.MessageSubject; | ||
5 | +import org.onlab.onos.store.cluster.messaging.SerializationService; | ||
6 | + | ||
7 | +import java.nio.ByteBuffer; | ||
8 | + | ||
9 | +import static com.google.common.base.Preconditions.checkState; | ||
10 | + | ||
11 | +/** | ||
12 | + * Factory for parsing messages sent between cluster members. | ||
13 | + */ | ||
14 | +public class MessageSerializer implements SerializationService { | ||
15 | + | ||
16 | + private static final int METADATA_LENGTH = 16; // 8 + 4 + 4 | ||
17 | + private static final int LENGTH_OFFSET = 12; | ||
18 | + | ||
19 | + private static final long MARKER = 0xfeedcafebeaddeadL; | ||
20 | + | ||
21 | + @Override | ||
22 | + public ClusterMessage decode(ByteBuffer buffer) { | ||
23 | + try { | ||
24 | + // Do we have enough bytes to read the header? If not, bail. | ||
25 | + if (buffer.remaining() < METADATA_LENGTH) { | ||
26 | + return null; | ||
27 | + } | ||
28 | + | ||
29 | + // Peek at the length and if we have enough to read the entire message | ||
30 | + // go ahead, otherwise bail. | ||
31 | + int length = buffer.getInt(buffer.position() + LENGTH_OFFSET); | ||
32 | + if (buffer.remaining() < length) { | ||
33 | + return null; | ||
34 | + } | ||
35 | + | ||
36 | + // At this point, we have enough data to read a complete message. | ||
37 | + long marker = buffer.getLong(); | ||
38 | + checkState(marker == MARKER, "Incorrect message marker"); | ||
39 | + | ||
40 | + int subjectOrdinal = buffer.getInt(); | ||
41 | + MessageSubject subject = MessageSubject.values()[subjectOrdinal]; | ||
42 | + length = buffer.getInt(); | ||
43 | + | ||
44 | + // TODO: sanity checking for length | ||
45 | + byte[] data = new byte[length - METADATA_LENGTH]; | ||
46 | + buffer.get(data); | ||
47 | + | ||
48 | + // TODO: add deserialization hook here; for now this hack | ||
49 | + return null; // actually deserialize | ||
50 | + | ||
51 | + } catch (Exception e) { | ||
52 | + // TODO: recover from exceptions by forwarding stream to next marker | ||
53 | + e.printStackTrace(); | ||
54 | + } | ||
55 | + return null; | ||
56 | + } | ||
57 | + | ||
58 | + @Override | ||
59 | + public void encode(ClusterMessage message, ByteBuffer buffer) { | ||
60 | + try { | ||
61 | + int i = 0; | ||
62 | + // Type based lookup for proper encoder | ||
63 | + } catch (Exception e) { | ||
64 | + // TODO: recover from exceptions by forwarding stream to next marker | ||
65 | + e.printStackTrace(); | ||
66 | + } | ||
67 | + } | ||
68 | + | ||
69 | +} |
... | @@ -19,6 +19,9 @@ | ... | @@ -19,6 +19,9 @@ |
19 | <bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle> | 19 | <bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle> |
20 | 20 | ||
21 | <bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle> | 21 | <bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle> |
22 | + | ||
23 | + <bundle>mvn:org.codehaus.jackson/jackson-core-asl/1.9.13</bundle> | ||
24 | + <bundle>mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13</bundle> | ||
22 | </feature> | 25 | </feature> |
23 | 26 | ||
24 | <feature name="onos-thirdparty-web" version="1.0.0" | 27 | <feature name="onos-thirdparty-web" version="1.0.0" |
... | @@ -130,4 +133,10 @@ | ... | @@ -130,4 +133,10 @@ |
130 | <bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle> | 133 | <bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle> |
131 | </feature> | 134 | </feature> |
132 | 135 | ||
136 | + <feature name="onos-app-config" version="1.0.0" | ||
137 | + description="ONOS network config reader"> | ||
138 | + <feature>onos-api</feature> | ||
139 | + <bundle>mvn:org.onlab.onos/onos-app-config/1.0.0-SNAPSHOT</bundle> | ||
140 | + </feature> | ||
141 | + | ||
133 | </features> | 142 | </features> | ... | ... |
... | @@ -92,6 +92,17 @@ | ... | @@ -92,6 +92,17 @@ |
92 | <version>3.3.2</version> | 92 | <version>3.3.2</version> |
93 | </dependency> | 93 | </dependency> |
94 | 94 | ||
95 | + <dependency> | ||
96 | + <groupId>org.codehaus.jackson</groupId> | ||
97 | + <artifactId>jackson-core-asl</artifactId> | ||
98 | + <version>1.9.13</version> | ||
99 | + </dependency> | ||
100 | + <dependency> | ||
101 | + <groupId>org.codehaus.jackson</groupId> | ||
102 | + <artifactId>jackson-mapper-asl</artifactId> | ||
103 | + <version>1.9.13</version> | ||
104 | + </dependency> | ||
105 | + | ||
95 | 106 | ||
96 | <!-- Web related --> | 107 | <!-- Web related --> |
97 | <dependency> | 108 | <dependency> | ... | ... |
-
Please register or login to post a comment