tom

Merge remote-tracking branch 'origin/master'

Conflicts:
	apps/foo/pom.xml
	apps/foo/src/main/java/org/onlab/onos/ccc/DistributedClusterStore.java
	cli/src/main/java/org/onlab/onos/cli/NodeAddCommand.java
	cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
	tools/test/bin/onos-config
	utils/nio/src/main/java/org/onlab/nio/IOLoop.java
Showing 78 changed files with 2180 additions and 750 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-config</artifactId>
<packaging>bundle</packaging>
<description>ONOS simple network configuration reader</description>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
package org.onlab.onos.config;
import java.util.Collections;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
/**
* Object to store address configuration read from a JSON file.
*/
public class AddressConfiguration {
private List<AddressEntry> addresses;
/**
* Gets a list of addresses in the system.
*
* @return the list of addresses
*/
public List<AddressEntry> getAddresses() {
return Collections.unmodifiableList(addresses);
}
/**
* Sets a list of addresses in the system.
*
* @param addresses the list of addresses
*/
@JsonProperty("addresses")
public void setAddresses(List<AddressEntry> addresses) {
this.addresses = addresses;
}
}
package org.onlab.onos.config;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
/**
* Represents a set of addresses bound to a port.
*/
public class AddressEntry {
private String dpid;
private short portNumber;
private List<IpPrefix> ipAddresses;
private MacAddress macAddress;
public String getDpid() {
return dpid;
}
@JsonProperty("dpid")
public void setDpid(String strDpid) {
this.dpid = strDpid;
}
public short getPortNumber() {
return portNumber;
}
@JsonProperty("port")
public void setPortNumber(short portNumber) {
this.portNumber = portNumber;
}
public List<IpPrefix> getIpAddresses() {
return ipAddresses;
}
@JsonProperty("ips")
public void setIpAddresses(List<IpPrefix> ipAddresses) {
this.ipAddresses = ipAddresses;
}
public MacAddress getMacAddress() {
return macAddress;
}
@JsonProperty("mac")
public void setMacAddress(MacAddress macAddress) {
this.macAddress = macAddress;
}
}
package org.onlab.onos.config;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.codehaus.jackson.map.ObjectMapper;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.PortAddresses;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
*/
@Component(immediate = true)
public class NetworkConfigReader {
private final Logger log = getLogger(getClass());
private static final String DEFAULT_CONFIG_FILE = "config/addresses.json";
private String configFileName = DEFAULT_CONFIG_FILE;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostAdminService hostAdminService;
@Activate
protected void activate() {
log.info("Started network config reader");
log.info("Config file set to {}", configFileName);
AddressConfiguration config = readNetworkConfig();
if (config != null) {
for (AddressEntry entry : config.getAddresses()) {
ConnectPoint cp = new ConnectPoint(
DeviceId.deviceId(dpidToUri(entry.getDpid())),
PortNumber.portNumber(entry.getPortNumber()));
PortAddresses addresses = new PortAddresses(cp,
Sets.newHashSet(entry.getIpAddresses()),
entry.getMacAddress());
hostAdminService.bindAddressesToPort(addresses);
}
}
}
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
private AddressConfiguration readNetworkConfig() {
File configFile = new File(configFileName);
ObjectMapper mapper = new ObjectMapper();
try {
AddressConfiguration config =
mapper.readValue(configFile, AddressConfiguration.class);
return config;
} catch (FileNotFoundException e) {
log.warn("Configuration file not found: {}", configFileName);
} catch (IOException e) {
log.error("Unable to read config from file:", e);
}
return null;
}
private static String dpidToUri(String dpid) {
return "of:" + dpid.replace(":", "");
}
}
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
*/
package org.onlab.onos.config;
{
"interfaces" : [
{
"dpid" : "00:00:00:00:00:00:01",
"port" : "1",
"ips" : ["192.168.10.101/24"],
"mac" : "00:00:00:11:22:33"
},
{
"dpid" : "00:00:00:00:00:00:02",
"port" : "1",
"ips" : ["192.168.20.101/24", "192.168.30.101/24"]
},
{
"dpid" : "00:00:00:00:00:00:03",
"port" : "1",
"ips" : ["10.1.0.1/16"],
"mac" : "00:00:00:00:00:01"
}
]
}
......@@ -28,20 +28,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.livetribe.slp</groupId>
<artifactId>livetribe-slp</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.karaf.shell</groupId>
<artifactId>org.apache.karaf.shell.console</artifactId>
</dependency>
......
package org.onlab.onos.ccc;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
import static org.onlab.util.Tools.namedThreads;
/**
* Distributed implementation of the cluster nodes store.
*/
@Component(immediate = true)
@Service
public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long SELECT_TIMEOUT = 50;
private static final int WORKERS = 3;
private static final int INITIATORS = 2;
private static final int COMM_BUFFER_SIZE = 16 * 1024;
private static final int COMM_IDLE_TIME = 500;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
private static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
private DefaultControllerNode self;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final ExecutorService initiatorExecutors =
Executors.newFixedThreadPool(INITIATORS, namedThreads("onos-comm-initiator"));
private final Timer timer = new Timer();
private final TimerTask connectionCustodian = new ConnectionCustodian();
private ListenLoop listenLoop;
private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
@Activate
public void activate() {
establishIdentity();
startCommunications();
startListening();
startInitiating();
log.info("Started");
}
@Deactivate
public void deactivate() {
listenLoop.shutdown();
for (CommLoop loop : commLoops) {
loop.shutdown();
}
log.info("Stopped");
}
// Establishes the controller's own identity.
private void establishIdentity() {
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(self.id(), self);
}
// Kicks off the IO loops.
private void startCommunications() {
for (int i = 0; i < WORKERS; i++) {
try {
CommLoop loop = new CommLoop();
commLoops.add(loop);
commExecutors.execute(loop);
} catch (IOException e) {
log.warn("Unable to start comm IO loop", e);
}
}
}
// Starts listening for connections from peer cluster members.
private void startListening() {
try {
listenLoop = new ListenLoop(self.ip(), self.tcpPort());
listenExecutor.execute(listenLoop);
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
nodesByChannel.put(ch, node);
ch.configureBlocking(false);
loop.connectStream(ch);
ch.connect(sa);
}
// Attempts to connect to any nodes that do not have an associated connection.
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
}
@Override
public ControllerNode getLocalNode() {
return self;
}
@Override
public Set<ControllerNode> getNodes() {
ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
return builder.addAll(nodes.values()).build();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@Override
public State getState(NodeId nodeId) {
State state = states.get(nodeId);
return state == null ? State.INACTIVE : state;
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
nodes.remove(nodeId);
}
// Listens and accepts inbound connections from other cluster nodes.
private class ListenLoop extends AcceptorLoop {
ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
findLeastUtilizedLoop().acceptStream(sc);
log.info("Connected client");
}
}
private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
CommLoop() throws IOException {
super(SELECT_TIMEOUT);
}
@Override
protected TLVMessageStream createStream(ByteChannel byteChannel) {
return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
}
@Override
protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
}
@Override
public TLVMessageStream acceptStream(SocketChannel channel) {
TLVMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted a new connection from {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
public TLVMessageStream connectStream(SocketChannel channel) {
TLVMessageStream stream = super.connectStream(channel);
DefaultControllerNode node = nodesByChannel.get(channel);
if (node != null) {
log.info("Opened connection to {}", node.id());
streams.put(node.id(), stream);
}
return stream;
}
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes.values()) {
if (node != self && !streams.containsKey(node.id())) {
try {
openConnection(node, findLeastUtilizedLoop());
} catch (IOException e) {
log.warn("Unable to connect", e);
}
}
}
}
}
// Finds the least utilities IO loop.
private CommLoop findLeastUtilizedLoop() {
CommLoop leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (CommLoop loop : commLoops) {
int count = loop.streamCount();
if (count == 0) {
return loop;
}
if (count < minCount) {
leastUtilized = loop;
minCount = count;
}
}
return leastUtilized;
}
}
package org.onlab.onos.ccc;
import org.onlab.nio.AbstractMessage;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications using TLVs.
*/
public class TLVMessage extends AbstractMessage {
private final int type;
private final Object data;
/**
* Creates an immutable TLV message.
*
* @param type message type
* @param length message length
* @param data message data
*/
public TLVMessage(int type, int length, Object data) {
this.length = length;
this.type = type;
this.data = data;
}
/**
* Returns the message type indicator.
*
* @return message type
*/
public int type() {
return type;
}
/**
* Returns the data object.
*
* @return message data
*/
public Object data() {
return data;
}
@Override
public int hashCode() {
return Objects.hash(type, data);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final TLVMessage other = (TLVMessage) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.data, other.data);
}
@Override
public String toString() {
return toStringHelper(this).add("type", type).add("length", length).toString();
}
}
package org.onlab.onos.ccc;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring TLV messages between cluster members.
*/
public class TLVMessageStream extends MessageStream<TLVMessage> {
private static final long MARKER = 0xfeedcafecafefeedL;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
*/
protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
}
@Override
protected TLVMessage read(ByteBuffer buffer) {
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int type = buffer.getInt();
int length = buffer.getInt();
// TODO: add deserialization hook here
return new TLVMessage(type, length, null);
}
@Override
protected void write(TLVMessage message, ByteBuffer buffer) {
buffer.putLong(MARKER);
buffer.putInt(message.type());
buffer.putInt(message.length());
// TODO: add serialization hook here
}
}
......@@ -233,7 +233,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......
......@@ -26,7 +26,9 @@ import org.onlab.onos.net.packet.InboundPacket;
import org.onlab.onos.net.packet.PacketContext;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.slf4j.Logger;
......@@ -50,6 +52,9 @@ public class ReactiveForwarding {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ProxyArpService proxyArpService;
private ReactivePacketProcessor processor = new ReactivePacketProcessor();
private ApplicationId appId;
......@@ -85,6 +90,16 @@ public class ReactiveForwarding {
InboundPacket pkt = context.inPacket();
Ethernet ethPkt = pkt.parsed();
if (ethPkt.getEtherType() == Ethernet.TYPE_ARP) {
ARP arp = (ARP) ethPkt.getPayload();
if (arp.getOpCode() == ARP.OP_REPLY) {
proxyArpService.forward(ethPkt);
} else if (arp.getOpCode() == ARP.OP_REQUEST) {
proxyArpService.reply(ethPkt);
}
context.block();
return;
}
HostId id = HostId.hostId(ethPkt.getDestinationMAC());
// Do we know who this is for? If not, flood and bail.
......
......@@ -20,6 +20,7 @@
<module>tvue</module>
<module>fwd</module>
<module>foo</module>
<module>config</module>
</modules>
<properties>
......
......@@ -7,10 +7,10 @@ import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Lists all controller cluster nodes.
* Adds a new controller cluster node.
*/
@Command(scope = "onos", name = "add-node",
description = "Lists all controller cluster nodes")
description = "Adds a new controller cluster node")
public class NodeAddCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
......@@ -21,7 +21,7 @@ public class NodeAddCommand extends AbstractShellCommand {
required = true, multiValued = false)
String ip = null;
@Argument(index = 2, name = "tcpPort", description = "TCP port",
@Argument(index = 2, name = "tcpPort", description = "Node TCP listen port",
required = false, multiValued = false)
int tcpPort = 9876;
......
package org.onlab.onos.cli;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cluster.ClusterAdminService;
import org.onlab.onos.cluster.NodeId;
/**
* Removes a controller cluster node.
*/
@Command(scope = "onos", name = "remove-node",
description = "Removes a new controller cluster node")
public class NodeRemoveCommand extends AbstractShellCommand {
@Argument(index = 0, name = "nodeId", description = "Node ID",
required = true, multiValued = false)
String nodeId = null;
@Override
protected void execute() {
ClusterAdminService service = get(ClusterAdminService.class);
service.removeNode(new NodeId(nodeId));
}
}
......@@ -8,6 +8,9 @@
<action class="org.onlab.onos.cli.NodeAddCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.NodeRemoveCommand"/>
</command>
<command>
<action class="org.onlab.onos.cli.MastersListCommand"/>
<completers>
<ref component-id="clusterIdCompleter"/>
......
......@@ -9,6 +9,8 @@ import org.onlab.onos.net.ConnectPoint;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import com.google.common.base.MoreObjects;
/**
* Represents address information bound to a port.
*/
......@@ -83,4 +85,13 @@ public class PortAddresses {
public int hashCode() {
return Objects.hash(connectPoint, ipAddresses, macAddress);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("connect-point", connectPoint)
.add("ip-addresses", ipAddresses)
.add("mac-address", macAddress)
.toString();
}
}
......
package org.onlab.onos.net.link;
import java.util.Set;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import java.util.Set;
/**
* Service for interacting with the inventory of infrastructure links.
*/
......
......@@ -21,9 +21,16 @@ public interface ProxyArpService {
* Sends a reply for a given request. If the host is not known then the arp
* will be flooded at all edge ports.
*
* @param request
* @param eth
* an arp request
*/
void reply(Ethernet request);
void reply(Ethernet eth);
/**
* Forwards an ARP request to its destination. Floods at the edge the ARP request if the
* destination is not known.
* @param eth an ethernet frame containing an ARP request.
*/
void forward(Ethernet eth);
}
......
/**
* Base abstractions related to the proxy arp service.
*/
package org.onlab.onos.net.proxyarp;
\ No newline at end of file
package org.onlab.onos.net.proxyarp;
......
package org.onlab.onos.net.link;
import java.util.Set;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import java.util.Set;
/**
* Test adapter for link service.
*/
......@@ -63,4 +63,5 @@ public class LinkServiceAdapter implements LinkService {
public void removeListener(LinkListener listener) {
}
}
......
......@@ -53,7 +53,7 @@ public class LinkManager
protected final AbstractListenerRegistry<LinkEvent, LinkListener>
listenerRegistry = new AbstractListenerRegistry<>();
private LinkStoreDelegate delegate = new InternalStoreDelegate();
private final LinkStoreDelegate delegate = new InternalStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
......
/**
* Core subsystem for responding to arp requests.
*/
package org.onlab.onos.proxyarp.impl;
\ No newline at end of file
package org.onlab.onos.net.proxyarp.impl;
......
package org.onlab.onos.proxyarp.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
public class ProxyArpManager implements ProxyArpService {
private static final String MAC_ADDR_NULL = "Mac address cannot be null.";
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Override
public boolean known(IpPrefix addr) {
checkNotNull(MAC_ADDR_NULL, addr);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
}
@Override
public void reply(Ethernet request) {
checkNotNull(REQUEST_NULL, request);
checkArgument(request.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) request.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REQUEST, NOT_ARP_REQUEST);
VlanId vlan = VlanId.vlanId(request.getVlanID());
Set<Host> hosts = hostService.getHostsByIp(IpPrefix.valueOf(arp
.getTargetProtocolAddress()));
Host h = null;
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
h = host;
break;
}
}
if (h == null) {
flood(request);
return;
}
Ethernet arpReply = buildArpReply(h, request);
// TODO: check send status with host service.
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(arpReply.serialize())));
}
private void flood(Ethernet request) {
// TODO: flood on all edge ports.
}
private Ethernet buildArpReply(Host h, Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMACAddress());
eth.setSourceMACAddress(h.mac().getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
ARP arp = new ARP();
arp.setOpCode(ARP.OP_REPLY);
arp.setSenderHardwareAddress(h.mac().getAddress());
arp.setTargetHardwareAddress(request.getSourceMACAddress());
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
eth.setPayload(arp);
return eth;
}
}
......@@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
......@@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest {
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
private KryoSerializationManager serializationMgr;
@Before
public void setUp() {
......@@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
dstore = new TestDistributedDeviceStore();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
dstore.activate();
mgr.store = dstore;
......@@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest {
mgr.deactivate();
dstore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -163,7 +171,7 @@ public class DistributedDeviceManagerTest {
public void deviceDisconnected() {
connectDevice(DID1, SW1);
connectDevice(DID2, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
assertTrue("device should be available", service.isAvailable(DID1));
// Disconnect
......@@ -182,10 +190,10 @@ public class DistributedDeviceManagerTest {
@Test
public void deviceUpdated() {
connectDevice(DID1, SW1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED);
validateEvents(DEVICE_ADDED);
connectDevice(DID1, SW2);
validateEvents(DEVICE_UPDATED, DEVICE_UPDATED);
validateEvents(DEVICE_UPDATED);
}
@Test
......@@ -202,7 +210,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P2, true));
pds.add(new DefaultPortDescription(P3, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
pds.clear();
pds.add(new DefaultPortDescription(P1, false));
......@@ -218,7 +226,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
validateEvents(PORT_UPDATED);
......@@ -233,7 +241,7 @@ public class DistributedDeviceManagerTest {
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
assertEquals("wrong port count", 2, service.getPorts(DID1).size());
Port port = service.getPort(DID1, P1);
......@@ -247,7 +255,7 @@ public class DistributedDeviceManagerTest {
connectDevice(DID2, SW2);
assertEquals("incorrect device count", 2, service.getDeviceCount());
admin.removeDevice(DID1);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED, DEVICE_REMOVED);
validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED);
assertNull("device should not be found", service.getDevice(DID1));
assertNotNull("device should be found", service.getDevice(DID2));
assertEquals("incorrect device count", 1, service.getDeviceCount());
......@@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest {
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore() {
this.storeService = storeManager;
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
......
......@@ -26,6 +26,23 @@
<artifactId>onos-core-serializers</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onlab.onos</groupId>
<artifactId>onlab-nio</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AcceptorLoop;
import org.onlab.packet.IpPrefix;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import static java.net.InetAddress.getByAddress;
/**
* Listens to inbound connection requests and accepts them.
*/
public class ClusterConnectionListener extends AcceptorLoop {
private static final long SELECT_TIMEOUT = 50;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final WorkerFinder workerFinder;
ClusterConnectionListener(IpPrefix ip, int tcpPort,
WorkerFinder workerFinder) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.workerFinder = workerFinder;
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
workerFinder.findWorker().acceptStream(sc);
}
}
package org.onlab.onos.store.cluster.impl;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Allows for reading and writing cluster definition as a JSON file.
*/
public class ClusterDefinitionStore {
private final File file;
/**
* Creates a reader/writer of the cluster definition file.
*
* @param filePath location of the definition file
*/
public ClusterDefinitionStore(String filePath) {
file = new File(filePath);
}
/**
* Returns set of the controller nodes, including self.
*
* @return set of controller nodes
*/
public Set<DefaultControllerNode> read() throws IOException {
Set<DefaultControllerNode> nodes = new HashSet<>();
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = (ObjectNode) mapper.readTree(file);
Iterator<JsonNode> it = ((ArrayNode) clusterNodeDef.get("nodes")).elements();
while (it.hasNext()) {
ObjectNode nodeDef = (ObjectNode) it.next();
nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
IpPrefix.valueOf(nodeDef.get("ip").asText()),
nodeDef.get("tcpPort").asInt(9876)));
}
return nodes;
}
/**
* Writes the given set of the controller nodes.
*
* @param nodes set of controller nodes
*/
public void write(Set<DefaultControllerNode> nodes) throws IOException {
ObjectMapper mapper = new ObjectMapper();
ObjectNode clusterNodeDef = mapper.createObjectNode();
ArrayNode nodeDefs = mapper.createArrayNode();
clusterNodeDef.set("nodes", nodeDefs);
for (DefaultControllerNode node : nodes) {
ObjectNode nodeDef = mapper.createObjectNode();
nodeDef.put("id", node.id().toString())
.put("ip", node.ip().toString())
.put("tcpPort", node.tcpPort());
nodeDefs.add(nodeDef);
}
mapper.writeTree(new JsonFactory().createGenerator(file, JsonEncoding.UTF8),
clusterNodeDef);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Objects;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Performs the IO operations related to a cluster-wide communications.
*/
public class ClusterIOWorker extends
IOLoop<ClusterMessage, ClusterMessageStream> {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long SELECT_TIMEOUT = 50;
private final ConnectionManager connectionManager;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param connectionManager parent connection manager
* @param commsDelegate communications delegate for dispatching
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ConnectionManager connectionManager,
CommunicationsDelegate commsDelegate,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.connectionManager = connectionManager;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
@Override
protected ClusterMessageStream createStream(ByteChannel byteChannel) {
return new ClusterMessageStream(serializationService, this, byteChannel);
}
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
for (ClusterMessage message : messages) {
commsDelegate.dispatch(message);
}
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(helloMessage);
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
stream.write(helloMessage);
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
}
}
@Override
protected void removeStream(MessageStream<ClusterMessage> stream) {
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
connectionManager.removeNodeStream(node);
}
super.removeStream(stream);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
*/
public interface ClusterNodesDelegate {
/**
* Notifies about a new cluster node being detected.
*
* @param node newly detected cluster node
*/
void nodeDetected(DefaultControllerNode node);
/**
* Notifies about cluster node going offline.
*
* @param node cluster node that vanished
*/
void nodeVanished(DefaultControllerNode node);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Simple back interface for interacting with the communications service.
*/
public interface CommunicationsDelegate {
/**
* Dispatches the specified message to all registered subscribers.
*
* @param message message to be dispatched
*/
void dispatch(ClusterMessage message);
/**
* Sets the sender.
*
* @param messageSender message sender
*/
void setSender(MessageSender messageSender);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Manages connections to other controller cluster nodes.
*/
public class ConnectionManager implements MessageSender {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private final DefaultControllerNode localNode;
private final ClusterNodesDelegate nodesDelegate;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
/**
* Creates a new connection manager.
*/
ConnectionManager(DefaultControllerNode localNode,
ClusterNodesDelegate nodesDelegate,
CommunicationsDelegate commsDelegate,
SerializationService serializationService) {
this.localNode = localNode;
this.nodesDelegate = nodesDelegate;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
commsDelegate.setSender(this);
startCommunications();
startListening();
startInitiating();
log.info("Started");
}
/**
* Shuts down the connection manager.
*/
void shutdown() {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
log.info("Stopped");
}
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node) {
nodes.add(node);
}
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node) {
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node);
streams.remove(node.id());
}
@Override
public boolean send(NodeId nodeId, ClusterMessage message) {
ClusterMessageStream stream = streams.get(nodeId);
if (stream != null) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send a message about {} to node {}",
message.subject(), nodeId);
}
}
return false;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, commsDelegate,
serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
workerFinder);
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (node != localNode && !streams.containsKey(node.id())) {
try {
initiateConnection(node, workerFinder.findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
// Finds the least utilitied IO loop.
private class LeastUtilitiedWorkerFinder implements WorkerFinder {
@Override
public ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
}
}
package org.onlab.onos.store.cluster.impl;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Distributed implementation of the cluster nodes store.
*/
@Component(immediate = true)
@Service
public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private final Logger log = LoggerFactory.getLogger(getClass());
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CommunicationsDelegate commsDelegate;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private SerializationService serializationService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
establishSelfIdentity();
connectionManager = new ConnectionManager(localNode, nodesDelegate,
commsDelegate, serializationService);
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Loads the cluster definition file.
*/
private void loadClusterDefinition() {
ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
try {
Set<DefaultControllerNode> storedNodes = cds.read();
for (DefaultControllerNode node : storedNodes) {
nodes.put(node.id(), node);
}
} catch (IOException e) {
log.error("Unable to read cluster definitions", e);
}
}
/**
* Determines who the local controller node is.
*/
private void establishSelfIdentity() {
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
localNode = nodes.get(new NodeId(ip.toString()));
// As a fall-back, let's make sure we at least know who we are.
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
states.put(localNode.id(), State.ACTIVE);
}
}
@Override
public ControllerNode getLocalNode() {
return localNode;
}
@Override
public Set<ControllerNode> getNodes() {
ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
return builder.addAll(nodes.values()).build();
}
@Override
public ControllerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
@Override
public State getState(NodeId nodeId) {
State state = states.get(nodeId);
return state == null ? State.INACTIVE : state;
}
@Override
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
connectionManager.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
connectionManager.removeNode(node);
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
public void nodeDetected(DefaultControllerNode node) {
nodes.put(node.id(), node);
states.put(node.id(), State.ACTIVE);
}
@Override
public void nodeVanished(DefaultControllerNode node) {
states.put(node.id(), State.INACTIVE);
}
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Created by tom on 9/29/14.
*/
public interface MessageSender {
/**
* Sends the specified message to the given cluster node.
*
* @param nodeId node identifier
* @param message mesage to send
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(NodeId nodeId, ClusterMessage message);
}
package org.onlab.onos.store.cluster.impl;
/**
* Provides means to find a worker IO loop.
*/
public interface WorkerFinder {
/**
* Finds a suitable worker.
*
* @return available worker
*/
ClusterIOWorker findWorker();
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import java.util.Set;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(ClusterMessage message, NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Removes the specified subscriber from the given message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Returns the set of subscribers for the specified message subject.
*
* @param subject message subject
* @return set of message subscribers
*/
Set<MessageSubscriber> getSubscribers(MessageSubject subject);
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.AbstractMessage;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications.
*/
public abstract class ClusterMessage extends AbstractMessage {
private final MessageSubject subject;
/**
* Creates a cluster message.
*
* @param subject message subject
*/
protected ClusterMessage(MessageSubject subject) {
this.subject = subject;
}
/**
* Returns the message subject indicator.
*
* @return message subject
*/
public MessageSubject subject() {
return subject;
}
@Override
public String toString() {
return toStringHelper(this).add("subject", subject).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring messages between two cluster members.
*/
public class ClusterMessageStream extends MessageStream<ClusterMessage> {
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private DefaultControllerNode node;
private SerializationService serializationService;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param serializationService service for encoding/decoding messages
* @param loop IO loop
* @param byteChannel backing byte channel
*/
public ClusterMessageStream(SerializationService serializationService,
IOLoop<ClusterMessage, ?> loop,
ByteChannel byteChannel) {
super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
this.serializationService = serializationService;
}
/**
* Returns the node with which this stream is associated.
*
* @return controller node
*/
public DefaultControllerNode node() {
return node;
}
/**
* Sets the node with which this stream is affiliated.
*
* @param node controller node
*/
public void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected ClusterMessage read(ByteBuffer buffer) {
return serializationService.decode(buffer);
}
@Override
protected void write(ClusterMessage message, ByteBuffer buffer) {
serializationService.encode(message, buffer);
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**l
* Echo heart-beat message that nodes send to each other.
*/
public class EchoMessage extends ClusterMessage {
private NodeId nodeId;
// For serialization
private EchoMessage() {
super(MessageSubject.HELLO);
nodeId = null;
}
/**
* Creates a new heart-beat echo message.
*
* @param nodeId sending node identification
*/
public EchoMessage(NodeId nodeId) {
super(MessageSubject.HELLO);
nodeId = nodeId;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
private HelloMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new hello message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
nodeId = nodeId;
ipAddress = ipAddress;
tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
package org.onlab.onos.store.cluster.messaging;
/**
* Representation of a message subject.
*/
public enum MessageSubject {
/** Represents a first greeting message. */
HELLO,
/** Signifies a heart-beat message. */
ECHO
}
package org.onlab.onos.store.cluster.messaging;
/**
* Represents a message consumer.
*/
public interface MessageSubscriber {
/**
* Receives the specified cluster message.
*
* @param message message to be received
*/
void receive(ClusterMessage message);
}
package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for serializing/deserializing intra-cluster messages.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain a message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
*/
ClusterMessage decode(ByteBuffer buffer);
/**
* Encodes the specified message into the given byte buffer.
*
* @param message message to be encoded
* @param buffer byte buffer to receive the message data
*/
void encode(ClusterMessage message, ByteBuffer buffer);
}
package org.onlab.onos.store.cluster.messaging.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
import org.onlab.onos.store.cluster.impl.MessageSender;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import java.util.Set;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, CommunicationsDelegate {
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
private MessageSender messageSender;
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
return messageSender.send(toNodeId, message);
}
@Override
public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void dispatch(ClusterMessage message) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message);
}
}
}
@Override
public void setSender(MessageSender messageSender) {
this.messageSender = messageSender;
}
}
package org.onlab.onos.store.cluster.messaging.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
public class MessageSerializer implements SerializationService {
private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafebeaddeadL;
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int subjectOrdinal = buffer.getInt();
MessageSubject subject = MessageSubject.values()[subjectOrdinal];
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
// TODO: add deserialization hook here; for now this hack
return null; // actually deserialize
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
return null;
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
int i = 0;
// Type based lookup for proper encoder
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
}
}
......@@ -86,46 +86,48 @@ public class OnosDistributedDeviceStore
@Override
public Iterable<Device> getDevices() {
// TODO builder v.s. copyOf. Guava semms to be using copyOf?
// FIXME: synchronize.
Builder<Device> builder = ImmutableSet.builder();
for (VersionedValue<? extends Device> device : devices.values()) {
builder.add(device.entity());
synchronized (this) {
for (VersionedValue<Device> device : devices.values()) {
builder.add(device.entity());
}
return builder.build();
}
return builder.build();
}
@Override
public Device getDevice(DeviceId deviceId) {
return devices.get(deviceId).entity();
VersionedValue<Device> device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
return device.entity();
}
@Override
public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription) {
Timestamp now = clockService.getTimestamp(deviceId);
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
VersionedValue<Device> device = devices.get(deviceId);
if (device == null) {
return createDevice(providerId, deviceId, deviceDescription, now);
return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
}
checkState(now.compareTo(device.timestamp()) > 0,
checkState(newTimestamp.compareTo(device.timestamp()) > 0,
"Existing device has a timestamp in the future!");
return updateDevice(providerId, device.entity(), deviceDescription, now);
return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
}
// Creates the device and returns the appropriate event if necessary.
private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription desc, Timestamp timestamp) {
DefaultDevice device = new DefaultDevice(providerId, deviceId, desc.type(),
Device device = new DefaultDevice(providerId, deviceId, desc.type(),
desc.manufacturer(),
desc.hwVersion(), desc.swVersion(),
desc.serialNumber());
devices.put(deviceId, new VersionedValue<Device>(device, true, timestamp));
// FIXME: broadcast a message telling peers of a device event.
devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
// TODO,FIXME: broadcast a message telling peers of a device event.
return new DeviceEvent(DEVICE_ADDED, device, null);
}
......@@ -148,7 +150,7 @@ public class OnosDistributedDeviceStore
}
// Otherwise merely attempt to change availability
DefaultDevice updated = new DefaultDevice(providerId, device.id(),
Device updated = new DefaultDevice(providerId, device.id(),
desc.type(),
desc.manufacturer(),
desc.hwVersion(),
......@@ -196,18 +198,18 @@ public class OnosDistributedDeviceStore
VersionedValue<Device> device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
Timestamp timestamp = clockService.getTimestamp(deviceId);
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
VersionedValue<Port> port = ports.get(portDescription.portNumber());
if (port == null) {
events.add(createPort(device, portDescription, ports, timestamp));
events.add(createPort(device, portDescription, ports, newTimestamp));
}
checkState(timestamp.compareTo(port.timestamp()) > 0,
checkState(newTimestamp.compareTo(port.timestamp()) > 0,
"Existing port state has a timestamp in the future!");
events.add(updatePort(device, port, portDescription, ports, timestamp));
events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
processed.add(portDescription.portNumber());
}
......@@ -233,19 +235,19 @@ public class OnosDistributedDeviceStore
// Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
//@GuardedBy("this")
private DeviceEvent updatePort(VersionedValue<Device> device, VersionedValue<Port> port,
private DeviceEvent updatePort(Device device, Port port,
PortDescription portDescription,
Map<PortNumber, VersionedValue<Port>> ports,
Timestamp timestamp) {
if (port.entity().isEnabled() != portDescription.isEnabled()) {
if (port.isEnabled() != portDescription.isEnabled()) {
VersionedValue<Port> updatedPort = new VersionedValue<Port>(
new DefaultPort(device.entity(), portDescription.portNumber(),
new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled()),
portDescription.isEnabled(),
timestamp);
ports.put(port.entity().number(), updatedPort);
updatePortMap(device.entity().id(), ports);
return new DeviceEvent(PORT_UPDATED, device.entity(), updatedPort.entity());
ports.put(port.number(), updatedPort);
updatePortMap(device.id(), ports);
return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
}
return null;
}
......@@ -300,7 +302,7 @@ public class OnosDistributedDeviceStore
Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
VersionedValue<Port> port = ports.get(portDescription.portNumber());
Timestamp timestamp = clockService.getTimestamp(deviceId);
return updatePort(device, port, portDescription, ports, timestamp);
return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
}
@Override
......
package org.onlab.onos.store.link.impl;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.onos.net.link.LinkEvent.Type.LINK_UPDATED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.device.impl.VersionedValue;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.ImmutableSet.Builder;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
* Manages inventory of infrastructure links using a protocol that takes into consideration
* the order in which events occur.
*/
// FIXME: This does not yet implement the full protocol.
// The full protocol requires the sender of LLDP message to include the
// version information of src device/port and the receiver to
// take that into account when figuring out if a more recent src
// device/port down event renders the link discovery obsolete.
@Component(immediate = true)
@Service
public class OnosDistributedLinkStore
extends AbstractStore<LinkEvent, LinkStoreDelegate>
implements LinkStore {
private final Logger log = getLogger(getClass());
// Link inventory
private ConcurrentMap<LinkKey, VersionedValue<Link>> links;
public static final String LINK_NOT_FOUND = "Link between %s and %s not found";
// TODO synchronize?
// Egress and ingress link sets
private final Multimap<DeviceId, VersionedValue<Link>> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, VersionedValue<Link>> dstLinks = HashMultimap.create();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
@Activate
public void activate() {
links = new ConcurrentHashMap<>();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public int getLinkCount() {
return links.size();
}
@Override
public Iterable<Link> getLinks() {
Builder<Link> builder = ImmutableSet.builder();
synchronized (this) {
for (VersionedValue<Link> link : links.values()) {
builder.add(link.entity());
}
return builder.build();
}
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> egressLinks = ImmutableSet.copyOf(srcLinks.get(deviceId));
Set<Link> rawEgressLinks = new HashSet<>();
for (VersionedValue<Link> link : egressLinks) {
rawEgressLinks.add(link.entity());
}
return rawEgressLinks;
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
Set<VersionedValue<Link>> ingressLinks = ImmutableSet.copyOf(dstLinks.get(deviceId));
Set<Link> rawIngressLinks = new HashSet<>();
for (VersionedValue<Link> link : ingressLinks) {
rawIngressLinks.add(link.entity());
}
return rawIngressLinks;
}
@Override
public Link getLink(ConnectPoint src, ConnectPoint dst) {
VersionedValue<Link> link = links.get(new LinkKey(src, dst));
checkArgument(link != null, "LINK_NOT_FOUND", src, dst);
return link.entity();
}
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egressLinks = new HashSet<>();
for (VersionedValue<Link> link : srcLinks.get(src.deviceId())) {
if (link.entity().src().equals(src)) {
egressLinks.add(link.entity());
}
}
return egressLinks;
}
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingressLinks = new HashSet<>();
for (VersionedValue<Link> link : dstLinks.get(dst.deviceId())) {
if (link.entity().dst().equals(dst)) {
ingressLinks.add(link.entity());
}
}
return ingressLinks;
}
@Override
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
final DeviceId destinationDeviceId = linkDescription.dst().deviceId();
final Timestamp newTimestamp = clockService.getTimestamp(destinationDeviceId);
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
VersionedValue<Link> link = links.get(key);
if (link == null) {
return createLink(providerId, key, linkDescription, newTimestamp);
}
checkState(newTimestamp.compareTo(link.timestamp()) > 0,
"Existing Link has a timestamp in the future!");
return updateLink(providerId, link, key, linkDescription, newTimestamp);
}
// Creates and stores the link and returns the appropriate event.
private LinkEvent createLink(ProviderId providerId, LinkKey key,
LinkDescription linkDescription, Timestamp timestamp) {
VersionedValue<Link> link = new VersionedValue<Link>(new DefaultLink(providerId, key.src(), key.dst(),
linkDescription.type()), true, timestamp);
synchronized (this) {
links.put(key, link);
addNewLink(link, timestamp);
}
// FIXME: notify peers.
return new LinkEvent(LINK_ADDED, link.entity());
}
// update Egress and ingress link sets
private void addNewLink(VersionedValue<Link> link, Timestamp timestamp) {
Link rawLink = link.entity();
synchronized (this) {
srcLinks.put(rawLink.src().deviceId(), link);
dstLinks.put(rawLink.dst().deviceId(), link);
}
}
// Updates, if necessary the specified link and returns the appropriate event.
private LinkEvent updateLink(ProviderId providerId, VersionedValue<Link> existingLink,
LinkKey key, LinkDescription linkDescription, Timestamp timestamp) {
// FIXME confirm Link update condition is OK
if (existingLink.entity().type() == INDIRECT && linkDescription.type() == DIRECT) {
synchronized (this) {
VersionedValue<Link> updatedLink = new VersionedValue<Link>(
new DefaultLink(providerId, existingLink.entity().src(), existingLink.entity().dst(),
linkDescription.type()), true, timestamp);
links.replace(key, existingLink, updatedLink);
replaceLink(existingLink, updatedLink);
// FIXME: notify peers.
return new LinkEvent(LINK_UPDATED, updatedLink.entity());
}
}
return null;
}
// update Egress and ingress link sets
private void replaceLink(VersionedValue<Link> current, VersionedValue<Link> updated) {
synchronized (this) {
srcLinks.remove(current.entity().src().deviceId(), current);
dstLinks.remove(current.entity().dst().deviceId(), current);
srcLinks.put(current.entity().src().deviceId(), updated);
dstLinks.put(current.entity().dst().deviceId(), updated);
}
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
synchronized (this) {
LinkKey key = new LinkKey(src, dst);
VersionedValue<Link> link = links.remove(key);
if (link != null) {
removeLink(link);
// notify peers
return new LinkEvent(LINK_REMOVED, link.entity());
}
return null;
}
}
// update Egress and ingress link sets
private void removeLink(VersionedValue<Link> link) {
synchronized (this) {
srcLinks.remove(link.entity().src().deviceId(), link);
dstLinks.remove(link.entity().dst().deviceId(), link);
}
}
}
......@@ -49,6 +49,7 @@ public class DistributedClusterStore
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Override
@Activate
public void activate() {
super.activate();
......@@ -56,9 +57,9 @@ public class DistributedClusterStore
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(storeService, rawNodes);
= new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteEventHandler<>(nodes), true);
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
loadClusterNodes();
......
......@@ -52,7 +52,7 @@ implements MastershipStore {
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(storeService, rawMasters);
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
......@@ -123,7 +123,7 @@ implements MastershipStore {
return null;
}
private class RemoteMasterShipEventHandler extends RemoteEventHandler<DeviceId, NodeId> {
private class RemoteMasterShipEventHandler extends RemoteCacheEventHandler<DeviceId, NodeId> {
public RemoteMasterShipEventHandler(LoadingCache<DeviceId, Optional<NodeId>> cache) {
super(cache);
}
......
......@@ -6,6 +6,7 @@ import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
import com.hazelcast.core.Member;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -14,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -31,6 +33,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KryoSerializationService kryoSerializationService;
protected HazelcastInstance theInstance;
@Activate
......@@ -45,7 +50,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return serialized object
*/
protected byte[] serialize(Object obj) {
return storeService.serialize(obj);
return kryoSerializationService.serialize(obj);
}
/**
......@@ -56,7 +61,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
return storeService.deserialize(bytes);
return kryoSerializationService.deserialize(bytes);
}
......@@ -66,8 +71,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
private LoadingCache<K, Optional<V>> cache;
/**
......@@ -75,17 +81,26 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
*
* @param cache cache to update
*/
public RemoteEventHandler(LoadingCache<K, Optional<V>> cache) {
public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
this.localMember = theInstance.getCluster().getLocalMember();
this.cache = checkNotNull(cache);
}
@Override
public void mapCleared(MapEvent event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
cache.invalidateAll();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
......@@ -95,6 +110,10 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
......@@ -106,6 +125,10 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getOldValue());
cache.invalidate(key);
......@@ -141,4 +164,80 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
}
}
/**
* Distributed object remote event entry listener.
*
* @param <K> Entry key type after deserialization
* @param <V> Entry value type after deserialization
*/
public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private final Member localMember;
public RemoteEventHandler() {
this.localMember = theInstance.getCluster().getLocalMember();
}
@Override
public void entryAdded(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V newVal = deserialize(event.getValue());
onAdd(key, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V val = deserialize(event.getValue());
onRemove(key, val);
}
@Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
if (localMember.equals(event.getMember())) {
// ignore locally triggered event
return;
}
K key = deserialize(event.getKey());
V oldVal = deserialize(event.getOldValue());
V newVal = deserialize(event.getValue());
onUpdate(key, oldVal, newVal);
}
/**
* Remote entry addition hook.
*
* @param key new key
* @param newVal new value
*/
protected void onAdd(K key, V newVal) {
}
/**
* Remote entry update hook.
*
* @param key new key
* @param oldValue old value
* @param newVal new value
*/
protected void onUpdate(K key, V oldValue, V newVal) {
}
/**
* Remote entry remove hook.
*
* @param key new key
* @param val old value
*/
protected void onRemove(K key, V val) {
}
}
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
import com.hazelcast.core.IMap;
......@@ -16,28 +18,28 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final StoreService storeService;
private final KryoSerializationService kryoSerializationService;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param storeService to use for serialization
* @param kryoSerializationService to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) {
this.storeService = checkNotNull(storeService);
public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
this.kryoSerializationService = checkNotNull(kryoSerializationService);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = storeService.serialize(key);
byte[] keyBytes = kryoSerializationService.serialize(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = storeService.deserialize(valBytes);
V dev = kryoSerializationService.deserialize(valBytes);
return Optional.of(dev);
}
}
......
......@@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import de.javakaffee.kryoserializers.URISerializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Auxiliary bootstrap of distributed store.
......@@ -58,55 +26,18 @@ public class StoreManager implements StoreService {
private final Logger log = LoggerFactory.getLogger(getClass());
protected HazelcastInstance instance;
private KryoPool serializerPool;
@Activate
public void activate() {
try {
Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
instance = Hazelcast.newHazelcastInstance(config);
setupKryoPool();
log.info("Started");
} catch (FileNotFoundException e) {
log.error("Unable to configure Hazelcast", e);
}
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(10);
}
@Deactivate
public void deactivate() {
instance.shutdown();
......@@ -118,18 +49,4 @@ public class StoreManager implements StoreService {
return instance;
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
......
......@@ -15,22 +15,4 @@ public interface StoreService {
*/
HazelcastInstance getHazelcastInstance();
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......
......@@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager {
this.instance = instance;
}
// Hazelcast setup removed from original code.
@Override
public void activate() {
setupKryoPool();
// Hazelcast setup removed from original code.
}
}
......
......@@ -72,6 +72,10 @@ public class DistributedDeviceStore
private IMap<byte[], byte[]> rawDevicePorts;
private LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> devicePorts;
private String devicesListener;
private String portsListener;
@Override
@Activate
public void activate() {
......@@ -83,20 +87,20 @@ public class DistributedDeviceStore
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
= new OptionalCacheLoader<>(storeService, rawDevices);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
loadDeviceCache();
loadDevicePortsCache();
......@@ -106,6 +110,8 @@ public class DistributedDeviceStore
@Deactivate
public void deactivate() {
rawDevicePorts.removeEntryListener(portsListener);
rawDevices.removeEntryListener(devicesListener);
log.info("Stopped");
}
......@@ -354,7 +360,7 @@ public class DistributedDeviceStore
}
}
private class RemoteDeviceEventHandler extends RemoteEventHandler<DeviceId, DefaultDevice> {
private class RemoteDeviceEventHandler extends RemoteCacheEventHandler<DeviceId, DefaultDevice> {
public RemoteDeviceEventHandler(LoadingCache<DeviceId, Optional<DefaultDevice>> cache) {
super(cache);
}
......@@ -375,7 +381,7 @@ public class DistributedDeviceStore
}
}
private class RemotePortEventHandler extends RemoteEventHandler<DeviceId, Map<PortNumber, Port>> {
private class RemotePortEventHandler extends RemoteCacheEventHandler<DeviceId, Map<PortNumber, Port>> {
public RemotePortEventHandler(LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> cache) {
super(cache);
}
......
......@@ -58,6 +58,8 @@ public class DistributedLinkStore
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private String linksListener;
@Override
@Activate
public void activate() {
......@@ -68,10 +70,10 @@ public class DistributedLinkStore
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
= new OptionalCacheLoader<>(storeService, rawLinks);
= new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
loadLinkCache();
......@@ -80,7 +82,7 @@ public class DistributedLinkStore
@Deactivate
public void deactivate() {
super.activate();
rawLinks.removeEntryListener(linksListener);
log.info("Stopped");
}
......@@ -233,7 +235,7 @@ public class DistributedLinkStore
}
}
private class RemoteLinkEventHandler extends RemoteEventHandler<LinkKey, DefaultLink> {
private class RemoteLinkEventHandler extends RemoteCacheEventHandler<LinkKey, DefaultLink> {
public RemoteLinkEventHandler(LoadingCache<LinkKey, Optional<DefaultLink>> cache) {
super(cache);
}
......
......@@ -20,6 +20,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
......@@ -35,12 +36,17 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast based distributed DeviceStore implementation.
*/
public class DistributedDeviceStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
......@@ -57,6 +63,7 @@ public class DistributedDeviceStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
......@@ -78,7 +85,10 @@ public class DistributedDeviceStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
deviceStore = new TestDistributedDeviceStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
deviceStore.activate();
}
......@@ -86,6 +96,8 @@ public class DistributedDeviceStoreTest {
public void tearDown() throws Exception {
deviceStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -326,6 +338,7 @@ public class DistributedDeviceStoreTest {
}
// TODO add test for Port events when we have them
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public final void testEvents() throws InterruptedException {
final CountDownLatch addLatch = new CountDownLatch(1);
......@@ -379,8 +392,10 @@ public class DistributedDeviceStoreTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService) {
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
......@@ -15,6 +15,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
......@@ -29,27 +30,28 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
/**
* Test of the Hazelcast based distributed LinkStore implementation.
*/
public class DistributedLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
// private static final String MFR = "whitebox";
// private static final String HW = "1.1.x";
// private static final String SW1 = "3.8.1";
// private static final String SW2 = "3.9.5";
// private static final String SN = "43311-12345";
private static final PortNumber P1 = PortNumber.portNumber(1);
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
......@@ -69,13 +71,17 @@ public class DistributedLinkStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
linkStore = new TestDistributedLinkStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -302,6 +308,7 @@ public class DistributedLinkStoreTest {
assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
}
@Ignore("Ignore until Delegate spec. is clear.")
@Test
public final void testEvents() throws InterruptedException {
......@@ -354,8 +361,10 @@ public class DistributedLinkStoreTest {
class TestDistributedLinkStore extends DistributedLinkStore {
TestDistributedLinkStore(StoreService storeService) {
TestDistributedLinkStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.javakaffee.kryoserializers.URISerializer;
/**
* Serialization service using Kryo.
*/
@Component(immediate = true)
@Service
public class KryoSerializationManager implements KryoSerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(1);
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
package org.onlab.onos.store.serializers;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
*/
public interface KryoSerializationService {
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......@@ -20,7 +20,7 @@ import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure DEVICES using trivial in-memory
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
*/
@Component(immediate = true)
......
......@@ -101,9 +101,6 @@ public class SimpleDeviceStore
synchronized (this) {
devices.put(deviceId, device);
availableDevices.add(deviceId);
// For now claim the device as a master automatically.
// roles.put(deviceId, MastershipRole.MASTER);
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
}
......@@ -189,7 +186,7 @@ public class SimpleDeviceStore
new DefaultPort(device, portDescription.portNumber(),
portDescription.isEnabled());
ports.put(port.number(), updatedPort);
return new DeviceEvent(PORT_UPDATED, device, port);
return new DeviceEvent(PORT_UPDATED, device, updatedPort);
}
return null;
}
......
......@@ -51,8 +51,6 @@ public class SimpleLinkStore
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private static final Set<Link> EMPTY = ImmutableSet.of();
@Activate
public void activate() {
log.info("Started");
......
......@@ -9,7 +9,6 @@
<bundle>mvn:org.apache.commons/commons-lang3/3.3.2</bundle>
<bundle>mvn:com.google.guava/guava/18.0</bundle>
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:org.livetribe.slp/livetribe-slp-osgi/2.2.1</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
......@@ -20,6 +19,9 @@
<bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle>
<bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.codehaus.jackson/jackson-core-asl/1.9.13</bundle>
<bundle>mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13</bundle>
</feature>
<feature name="onos-thirdparty-web" version="1.0.0"
......@@ -49,20 +51,17 @@
description="ONOS core components">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-dist" version="1.0.0"
description="ONOS core components">
<feature name="onos-core-hazelcast" version="1.0.0"
description="ONOS core components built on hazelcast">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-common/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-cluster/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-hz-net/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-trivial" version="1.0.0"
......@@ -126,4 +125,10 @@
<bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-config" version="1.0.0"
description="ONOS network config reader">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-config/1.0.0-SNAPSHOT</bundle>
</feature>
</features>
......
......@@ -92,6 +92,17 @@
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!-- Web related -->
<dependency>
......
......@@ -106,10 +106,10 @@ public class OpenFlowPacketProvider extends AbstractProvider implements PacketPr
for (Instruction inst : packet.treatment().instructions()) {
if (inst.type().equals(Instruction.Type.OUTPUT)) {
p = portDesc(((OutputInstruction) inst).port());
if (!sw.getPorts().contains(p)) {
log.warn("Tried to write out non-existint port {}", p.getPortNo());
/*if (!sw.getPorts().contains(p)) {
log.warn("Tried to write out non-existent port {}", p.getPortNo());
continue;
}
}*/
OFPacketOut po = packetOut(sw, eth, p.getPortNo());
sw.sendMsg(po);
}
......
......@@ -154,9 +154,9 @@ public class OpenFlowPacketProviderTest {
assertEquals("message sent incorrectly", 0, sw.sent.size());
//to missing port
OutboundPacket portFailPkt = outPacket(DID, TR_MISSING, eth);
provider.emit(portFailPkt);
assertEquals("extra message sent", 1, sw.sent.size());
//OutboundPacket portFailPkt = outPacket(DID, TR_MISSING, eth);
//provider.emit(portFailPkt);
//assertEquals("extra message sent", 1, sw.sent.size());
}
......
......@@ -9,5 +9,5 @@
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
onos-package
for node in $nodes; do printf "%s: " $node; onos-install -f $node; done
for node in $nodes; do (printf "%s: %s\n" "$node" "`onos-install -f $node`")& done
for node in $nodes; do onos-wait-for-start $node; done
......
......@@ -15,7 +15,7 @@ env JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
pre-stop script
/opt/onos/bin/onos halt 2>/opt/onos/var/stderr.log
sleep 3
sleep 2
end script
script
......
......@@ -8,8 +8,21 @@
remote=$ONOS_USER@${1:-$OCI}
# Generate a cluster.json from the ON* environment variables
CDEF_FILE=/tmp/cluster.json
echo "{ \"nodes\":[" > $CDEF_FILE
for node in $(env | sort | egrep "OC[2-9]+" | cut -d= -f2); do
echo " { \"id\": \"$node\", \"ip\": \"$node\", \"tcpPort\": 9876 }," >> $CDEF_FILE
done
echo " { \"id\": \"$OC1\", \"ip\": \"$OC1\", \"tcpPort\": 9876 }" >> $CDEF_FILE
echo "]}" >> $CDEF_FILE
ssh $remote "
sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
echo \"onos.ip=\$(ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
"
\ No newline at end of file
echo \"onos.ip = \$(ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
"
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/
\ No newline at end of file
......
......@@ -24,6 +24,7 @@ ssh $remote "
# Make a link to the log file directory and make a home for auxiliaries
ln -s $ONOS_INSTALL_DIR/$KARAF_DIST/data/log /opt/onos/log
mkdir $ONOS_INSTALL_DIR/var
mkdir $ONOS_INSTALL_DIR/config
# Install the upstart configuration file and setup options for debugging
sudo cp $ONOS_INSTALL_DIR/debian/onos.conf /etc/init/onos.conf
......
package org.onlab.util;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
......@@ -8,6 +9,8 @@ import org.apache.commons.lang3.tuple.Pair;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.ImmutableList;
......@@ -174,6 +177,22 @@ public final class KryoPool {
}
/**
* Serializes given object to byte buffer using Kryo instance in pool.
*
* @param obj Object to serialize
* @param buffer to write to
*/
public void serialize(final Object obj, final ByteBuffer buffer) {
ByteBufferOutput out = new ByteBufferOutput(buffer);
Kryo kryo = getKryo();
try {
kryo.writeClassAndObject(out, obj);
} finally {
putKryo(kryo);
}
}
/**
* Deserializes given byte array to Object using Kryo instance in pool.
*
* @param bytes serialized bytes
......@@ -192,6 +211,24 @@ public final class KryoPool {
}
}
/**
* Deserializes given byte buffer to Object using Kryo instance in pool.
*
* @param buffer input with serialized bytes
* @param <T> deserialized Object type
* @return deserialized Object
*/
public <T> T deserialize(final ByteBuffer buffer) {
ByteBufferInput in = new ByteBufferInput(buffer);
Kryo kryo = getKryo();
try {
@SuppressWarnings("unchecked")
T obj = (T) kryo.readClassAndObject(in);
return obj;
} finally {
putKryo(kryo);
}
}
/**
* Creates a Kryo instance with {@link #registeredTypes} pre-registered.
......
......@@ -54,9 +54,9 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
}
/**
* Returns the number of streams in custody of the IO loop.
* Returns the number of message stream in custody of the loop.
*
* @return number of message streams using this loop
* @return number of message streams
*/
public int streamCount() {
return streams.size();
......@@ -93,14 +93,9 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
*
* @param key selection key holding the pending connect operation.
*/
protected void connect(SelectionKey key) {
try {
SocketChannel ch = (SocketChannel) key.channel();
ch.finishConnect();
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
protected void connect(SelectionKey key) throws IOException {
SocketChannel ch = (SocketChannel) key.channel();
ch.finishConnect();
if (key.isValid()) {
key.interestOps(SelectionKey.OP_READ);
}
......@@ -124,7 +119,11 @@ public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
// If there is a pending connect operation, complete it.
if (key.isConnectable()) {
connect(key);
try {
connect(key);
} catch (IOException | IllegalStateException e) {
log.warn("Unable to complete connection", e);
}
}
// If there is a read operation, slurp as much data as possible.
......
......@@ -10,6 +10,7 @@ import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -170,7 +171,7 @@ public abstract class MessageStream<M extends Message> {
}
/**
* Reads, withouth blocking, a list of messages from the stream.
* Reads, without blocking, a list of messages from the stream.
* The list will be empty if there were not messages pending.
*
* @return list of messages or null if backing channel has been closed
......@@ -262,7 +263,7 @@ public abstract class MessageStream<M extends Message> {
try {
channel.write(outbound);
} catch (IOException e) {
if (!closed && !e.getMessage().equals("Broken pipe")) {
if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
log.warn("Unable to write data", e);
ioError = e;
}
......
......@@ -230,7 +230,7 @@ public class IOLoopTestClient {
}
@Override
protected void connect(SelectionKey key) {
protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
......