Ayaka Koshibe

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 54 changed files with 1744 additions and 638 deletions
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.onlab.onos</groupId>
<artifactId>onos-apps</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>onos-app-config</artifactId>
<packaging>bundle</packaging>
<description>ONOS simple network configuration reader</description>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
package org.onlab.onos.config;
import java.util.Collections;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
/**
* Object to store address configuration read from a JSON file.
*/
public class AddressConfiguration {
private List<AddressEntry> addresses;
/**
* Gets a list of addresses in the system.
*
* @return the list of addresses
*/
public List<AddressEntry> getAddresses() {
return Collections.unmodifiableList(addresses);
}
/**
* Sets a list of addresses in the system.
*
* @param addresses the list of addresses
*/
@JsonProperty("addresses")
public void setAddresses(List<AddressEntry> addresses) {
this.addresses = addresses;
}
}
package org.onlab.onos.config;
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
/**
* Represents a set of addresses bound to a port.
*/
public class AddressEntry {
private String dpid;
private short portNumber;
private List<IpPrefix> ipAddresses;
private MacAddress macAddress;
public String getDpid() {
return dpid;
}
@JsonProperty("dpid")
public void setDpid(String strDpid) {
this.dpid = strDpid;
}
public short getPortNumber() {
return portNumber;
}
@JsonProperty("port")
public void setPortNumber(short portNumber) {
this.portNumber = portNumber;
}
public List<IpPrefix> getIpAddresses() {
return ipAddresses;
}
@JsonProperty("ips")
public void setIpAddresses(List<IpPrefix> ipAddresses) {
this.ipAddresses = ipAddresses;
}
public MacAddress getMacAddress() {
return macAddress;
}
@JsonProperty("mac")
public void setMacAddress(MacAddress macAddress) {
this.macAddress = macAddress;
}
}
package org.onlab.onos.config;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.codehaus.jackson.map.ObjectMapper;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.PortAddresses;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
*/
@Component(immediate = true)
public class NetworkConfigReader {
private final Logger log = getLogger(getClass());
private static final String DEFAULT_CONFIG_FILE = "config/addresses.json";
private String configFileName = DEFAULT_CONFIG_FILE;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostAdminService hostAdminService;
@Activate
protected void activate() {
log.info("Started network config reader");
log.info("Config file set to {}", configFileName);
AddressConfiguration config = readNetworkConfig();
if (config != null) {
for (AddressEntry entry : config.getAddresses()) {
ConnectPoint cp = new ConnectPoint(
DeviceId.deviceId(dpidToUri(entry.getDpid())),
PortNumber.portNumber(entry.getPortNumber()));
PortAddresses addresses = new PortAddresses(cp,
Sets.newHashSet(entry.getIpAddresses()),
entry.getMacAddress());
hostAdminService.bindAddressesToPort(addresses);
}
}
}
@Deactivate
protected void deactivate() {
log.info("Stopped");
}
private AddressConfiguration readNetworkConfig() {
File configFile = new File(configFileName);
ObjectMapper mapper = new ObjectMapper();
try {
AddressConfiguration config =
mapper.readValue(configFile, AddressConfiguration.class);
return config;
} catch (FileNotFoundException e) {
log.warn("Configuration file not found: {}", configFileName);
} catch (IOException e) {
log.error("Unable to read config from file:", e);
}
return null;
}
private static String dpidToUri(String dpid) {
return "of:" + dpid.replace(":", "");
}
}
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
*/
package org.onlab.onos.config;
{
"interfaces" : [
{
"dpid" : "00:00:00:00:00:00:01",
"port" : "1",
"ips" : ["192.168.10.101/24"],
"mac" : "00:00:00:11:22:33"
},
{
"dpid" : "00:00:00:00:00:00:02",
"port" : "1",
"ips" : ["192.168.20.101/24", "192.168.30.101/24"]
},
{
"dpid" : "00:00:00:00:00:00:03",
"port" : "1",
"ips" : ["10.1.0.1/16"],
"mac" : "00:00:00:00:00:01"
}
]
}
......@@ -26,7 +26,9 @@ import org.onlab.onos.net.packet.InboundPacket;
import org.onlab.onos.net.packet.PacketContext;
import org.onlab.onos.net.packet.PacketProcessor;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.slf4j.Logger;
......@@ -50,6 +52,9 @@ public class ReactiveForwarding {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ProxyArpService proxyArpService;
private ReactivePacketProcessor processor = new ReactivePacketProcessor();
private ApplicationId appId;
......@@ -85,6 +90,16 @@ public class ReactiveForwarding {
InboundPacket pkt = context.inPacket();
Ethernet ethPkt = pkt.parsed();
if (ethPkt.getEtherType() == Ethernet.TYPE_ARP) {
ARP arp = (ARP) ethPkt.getPayload();
if (arp.getOpCode() == ARP.OP_REPLY) {
proxyArpService.forward(ethPkt);
} else if (arp.getOpCode() == ARP.OP_REQUEST) {
proxyArpService.reply(ethPkt);
}
context.block();
return;
}
HostId id = HostId.hostId(ethPkt.getDestinationMAC());
// Do we know who this is for? If not, flood and bail.
......
......@@ -20,6 +20,7 @@
<module>tvue</module>
<module>fwd</module>
<module>foo</module>
<module>config</module>
</modules>
<properties>
......
......@@ -9,6 +9,8 @@ import org.onlab.onos.net.ConnectPoint;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import com.google.common.base.MoreObjects;
/**
* Represents address information bound to a port.
*/
......@@ -83,4 +85,13 @@ public class PortAddresses {
public int hashCode() {
return Objects.hash(connectPoint, ipAddresses, macAddress);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("connect-point", connectPoint)
.add("ip-addresses", ipAddresses)
.add("mac-address", macAddress)
.toString();
}
}
......
package org.onlab.onos.net.link;
import java.util.Set;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import java.util.Set;
/**
* Service for interacting with the inventory of infrastructure links.
*/
......
......@@ -21,9 +21,16 @@ public interface ProxyArpService {
* Sends a reply for a given request. If the host is not known then the arp
* will be flooded at all edge ports.
*
* @param request
* @param eth
* an arp request
*/
void reply(Ethernet request);
void reply(Ethernet eth);
/**
* Forwards an ARP request to its destination. Floods at the edge the ARP request if the
* destination is not known.
* @param eth an ethernet frame containing an ARP request.
*/
void forward(Ethernet eth);
}
......
package org.onlab.onos.net.link;
import java.util.Set;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import java.util.Set;
/**
* Test adapter for link service.
*/
......@@ -63,4 +63,5 @@ public class LinkServiceAdapter implements LinkService {
public void removeListener(LinkListener listener) {
}
}
......
......@@ -53,7 +53,7 @@ public class LinkManager
protected final AbstractListenerRegistry<LinkEvent, LinkListener>
listenerRegistry = new AbstractListenerRegistry<>();
private LinkStoreDelegate delegate = new InternalStoreDelegate();
private final LinkStoreDelegate delegate = new InternalStoreDelegate();
private final DeviceListener deviceListener = new InternalDeviceListener();
......
package org.onlab.onos.net.proxyarp.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkListener;
import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
@Component(immediate = true)
@Service
public class ProxyArpManager implements ProxyArpService {
private final Logger log = getLogger(getClass());
private static final String MAC_ADDR_NULL = "Mac address cannot be null.";
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkService linkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final Multimap<Device, PortNumber> internalPorts =
HashMultimap.<Device, PortNumber>create();
private final Multimap<Device, PortNumber> externalPorts =
HashMultimap.<Device, PortNumber>create();
/**
* Listens to both device service and link service to determine
* whether a port is internal or external.
*/
@Activate
public void activate() {
deviceService.addListener(new InternalDeviceListener());
linkService.addListener(new InternalLinkListener());
determinePortLocations();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public boolean known(IpPrefix addr) {
checkNotNull(MAC_ADDR_NULL, addr);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
}
@Override
public void reply(Ethernet eth) {
checkNotNull(REQUEST_NULL, eth);
checkArgument(eth.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) eth.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REQUEST, NOT_ARP_REQUEST);
VlanId vlan = VlanId.vlanId(eth.getVlanID());
Set<Host> hosts = hostService.getHostsByIp(IpPrefix.valueOf(arp
.getTargetProtocolAddress()));
Host dst = null;
Host src = hostService.getHost(HostId.hostId(eth.getSourceMAC(),
VlanId.vlanId(eth.getVlanID())));
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
dst = host;
break;
}
}
if (src == null || dst == null) {
flood(eth);
return;
}
Ethernet arpReply = buildArpReply(dst, eth);
// TODO: check send status with host service.
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(src.location().port());
packetService.emit(new DefaultOutboundPacket(src.location().deviceId(),
builder.build(), ByteBuffer.wrap(arpReply.serialize())));
}
@Override
public void forward(Ethernet eth) {
checkNotNull(REQUEST_NULL, eth);
checkArgument(eth.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) eth.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REPLY, NOT_ARP_REQUEST);
Host h = hostService.getHost(HostId.hostId(eth.getDestinationMAC(),
VlanId.vlanId(eth.getVlanID())));
if (h == null) {
flood(eth);
} else {
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(eth.serialize())));
}
}
/**
* Flood the arp request at all edges in the network.
* @param request the arp request.
*/
private void flood(Ethernet request) {
TrafficTreatment.Builder builder = null;
ByteBuffer buf = ByteBuffer.wrap(request.serialize());
synchronized (externalPorts) {
for (Entry<Device, PortNumber> entry : externalPorts.entries()) {
builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(entry.getValue());
packetService.emit(new DefaultOutboundPacket(entry.getKey().id(),
builder.build(), buf));
}
}
}
/**
* Determines the location of all known ports in the system.
*/
private void determinePortLocations() {
Iterable<Device> devices = deviceService.getDevices();
Iterable<Link> links = null;
List<PortNumber> ports = null;
for (Device d : devices) {
ports = buildPortNumberList(deviceService.getPorts(d.id()));
links = linkService.getLinks();
for (Link l : links) {
// for each link, mark the concerned ports as internal
// and the remaining ports are therefore external.
if (l.src().deviceId().equals(d)
&& ports.contains(l.src().port())) {
ports.remove(l.src().port());
internalPorts.put(d, l.src().port());
}
if (l.dst().deviceId().equals(d)
&& ports.contains(l.dst().port())) {
ports.remove(l.dst().port());
internalPorts.put(d, l.dst().port());
}
}
synchronized (externalPorts) {
externalPorts.putAll(d, ports);
}
}
}
private List<PortNumber> buildPortNumberList(List<Port> ports) {
List<PortNumber> portNumbers = Lists.newLinkedList();
for (Port p : ports) {
portNumbers.add(p.number());
}
return portNumbers;
}
/**
* Builds an arp reply based on a request.
* @param h the host we want to send to
* @param request the arp request we got
* @return an ethernet frame containing the arp reply
*/
private Ethernet buildArpReply(Host h, Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMACAddress());
eth.setSourceMACAddress(h.mac().getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
eth.setVlanID(request.getVlanID());
ARP arp = new ARP();
arp.setOpCode(ARP.OP_REPLY);
arp.setProtocolType(ARP.PROTO_TYPE_IP);
arp.setHardwareType(ARP.HW_TYPE_ETHERNET);
arp.setProtocolAddressLength((byte) IpPrefix.INET_LEN);
arp.setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH);
arp.setSenderHardwareAddress(h.mac().getAddress());
arp.setTargetHardwareAddress(request.getSourceMACAddress());
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
eth.setPayload(arp);
return eth;
}
public class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
Link link = event.subject();
Device src = deviceService.getDevice(link.src().deviceId());
Device dst = deviceService.getDevice(link.dst().deviceId());
switch (event.type()) {
case LINK_ADDED:
synchronized (externalPorts) {
externalPorts.remove(src, link.src().port());
externalPorts.remove(dst, link.dst().port());
internalPorts.put(src, link.src().port());
internalPorts.put(dst, link.dst().port());
}
break;
case LINK_REMOVED:
synchronized (externalPorts) {
externalPorts.put(src, link.src().port());
externalPorts.put(dst, link.dst().port());
internalPorts.remove(src, link.src().port());
internalPorts.remove(dst, link.dst().port());
}
break;
case LINK_UPDATED:
// don't care about links being updated.
break;
default:
break;
}
}
}
public class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
Device device = event.subject();
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_MASTERSHIP_CHANGED:
case DEVICE_SUSPENDED:
case DEVICE_UPDATED:
case PORT_UPDATED:
// nothing to do in these cases; handled when links get reported
break;
case DEVICE_REMOVED:
synchronized (externalPorts) {
externalPorts.removeAll(device);
internalPorts.removeAll(device);
}
break;
case PORT_ADDED:
synchronized (externalPorts) {
externalPorts.put(device, event.port().number());
internalPorts.remove(device, event.port().number());
}
break;
case PORT_REMOVED:
synchronized (externalPorts) {
externalPorts.remove(device, event.port().number());
internalPorts.remove(device, event.port().number());
}
break;
default:
break;
}
}
}
}
/**
* Core subsystem for responding to arp requests.
*/
package org.onlab.onos.proxyarp.impl;
package org.onlab.onos.net.proxyarp.impl;
......
package org.onlab.onos.proxyarp.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.nio.ByteBuffer;
import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.flow.DefaultTrafficTreatment;
import org.onlab.onos.net.flow.TrafficTreatment;
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.proxyarp.ProxyArpService;
import org.onlab.onos.net.topology.TopologyService;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
public class ProxyArpManager implements ProxyArpService {
private static final String MAC_ADDR_NULL = "Mac address cannot be null.";
private static final String REQUEST_NULL = "Arp request cannot be null.";
private static final String REQUEST_NOT_ARP = "Ethernet frame does not contain ARP request.";
private static final String NOT_ARP_REQUEST = "ARP is not a request.";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected TopologyService topologyService;
@Override
public boolean known(IpPrefix addr) {
checkNotNull(MAC_ADDR_NULL, addr);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
}
@Override
public void reply(Ethernet request) {
checkNotNull(REQUEST_NULL, request);
checkArgument(request.getEtherType() == Ethernet.TYPE_ARP,
REQUEST_NOT_ARP);
ARP arp = (ARP) request.getPayload();
checkArgument(arp.getOpCode() == ARP.OP_REQUEST, NOT_ARP_REQUEST);
VlanId vlan = VlanId.vlanId(request.getVlanID());
Set<Host> hosts = hostService.getHostsByIp(IpPrefix.valueOf(arp
.getTargetProtocolAddress()));
Host h = null;
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
h = host;
break;
}
}
if (h == null) {
flood(request);
return;
}
Ethernet arpReply = buildArpReply(h, request);
// TODO: check send status with host service.
TrafficTreatment.Builder builder = new DefaultTrafficTreatment.Builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(arpReply.serialize())));
}
private void flood(Ethernet request) {
// TODO: flood on all edge ports.
}
private Ethernet buildArpReply(Host h, Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMACAddress());
eth.setSourceMACAddress(h.mac().getAddress());
eth.setEtherType(Ethernet.TYPE_ARP);
ARP arp = new ARP();
arp.setOpCode(ARP.OP_REPLY);
arp.setSenderHardwareAddress(h.mac().getAddress());
arp.setTargetHardwareAddress(request.getSourceMACAddress());
arp.setTargetProtocolAddress(((ARP) request.getPayload())
.getSenderProtocolAddress());
arp.setSenderProtocolAddress(h.ipAddresses().iterator().next().toInt());
eth.setPayload(arp);
return eth;
}
}
......@@ -33,8 +33,11 @@ import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.AbstractProvider;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
......@@ -92,6 +95,7 @@ public class DistributedDeviceManagerTest {
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
private KryoSerializationManager serializationMgr;
@Before
public void setUp() {
......@@ -107,7 +111,10 @@ public class DistributedDeviceManagerTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
dstore = new TestDistributedDeviceStore();
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
dstore.activate();
mgr.store = dstore;
......@@ -133,6 +140,7 @@ public class DistributedDeviceManagerTest {
mgr.deactivate();
dstore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -298,8 +306,10 @@ public class DistributedDeviceManagerTest {
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore() {
this.storeService = storeManager;
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AcceptorLoop;
import org.onlab.packet.IpPrefix;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import static java.net.InetAddress.getByAddress;
/**
* Listens to inbound connection requests and accepts them.
*/
public class ClusterConnectionListener extends AcceptorLoop {
private static final long SELECT_TIMEOUT = 50;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private final WorkerFinder workerFinder;
ClusterConnectionListener(IpPrefix ip, int tcpPort,
WorkerFinder workerFinder) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
this.workerFinder = workerFinder;
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
workerFinder.findWorker().acceptStream(sc);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Objects;
import static org.onlab.packet.IpPrefix.valueOf;
/**
* Performs the IO operations related to a cluster-wide communications.
*/
public class ClusterIOWorker extends
IOLoop<ClusterMessage, ClusterMessageStream> {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long SELECT_TIMEOUT = 50;
private final ConnectionManager connectionManager;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
private final ClusterMessage helloMessage;
/**
* Creates a new cluster IO worker.
*
* @param connectionManager parent connection manager
* @param commsDelegate communications delegate for dispatching
* @param serializationService serialization service for encode/decode
* @param helloMessage hello message for greeting peers
* @throws IOException if errors occur during IO loop ignition
*/
ClusterIOWorker(ConnectionManager connectionManager,
CommunicationsDelegate commsDelegate,
SerializationService serializationService,
ClusterMessage helloMessage) throws IOException {
super(SELECT_TIMEOUT);
this.connectionManager = connectionManager;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
this.helloMessage = helloMessage;
}
@Override
protected ClusterMessageStream createStream(ByteChannel byteChannel) {
return new ClusterMessageStream(serializationService, this, byteChannel);
}
@Override
protected void processMessages(List<ClusterMessage> messages, MessageStream<ClusterMessage> stream) {
for (ClusterMessage message : messages) {
commsDelegate.dispatch(message);
}
}
@Override
public ClusterMessageStream acceptStream(SocketChannel channel) {
ClusterMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(helloMessage);
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
ClusterMessageStream stream = (ClusterMessageStream) key.attachment();
stream.write(helloMessage);
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
}
}
@Override
protected void removeStream(MessageStream<ClusterMessage> stream) {
DefaultControllerNode node = ((ClusterMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
connectionManager.removeNodeStream(node);
}
super.removeStream(stream);
}
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
/**
* Simple back interface through which connection manager can interact with
* the cluster store.
*/
public interface ClusterNodesDelegate {
/**
* Notifies about a new cluster node being detected.
*
* @param node newly detected cluster node
*/
void nodeDetected(DefaultControllerNode node);
/**
* Notifies about cluster node going offline.
*
* @param node cluster node that vanished
*/
void nodeVanished(DefaultControllerNode node);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Simple back interface for interacting with the communications service.
*/
public interface CommunicationsDelegate {
/**
* Dispatches the specified message to all registered subscribers.
*
* @param message message to be dispatched
*/
void dispatch(ClusterMessage message);
/**
* Sets the sender.
*
* @param messageSender message sender
*/
void setSender(MessageSender messageSender);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.util.Tools.namedThreads;
/**
* Manages connections to other controller cluster nodes.
*/
public class ConnectionManager implements MessageSender {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
private ClusterConnectionListener connectionListener;
private List<ClusterIOWorker> workers = new ArrayList<>(WORKERS);
private final DefaultControllerNode localNode;
private final ClusterNodesDelegate nodesDelegate;
private final CommunicationsDelegate commsDelegate;
private final SerializationService serializationService;
// Nodes to be monitored to make sure they have a connection.
private final Set<DefaultControllerNode> nodes = new HashSet<>();
// Means to track message streams to other nodes.
private final Map<NodeId, ClusterMessageStream> streams = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
private final WorkerFinder workerFinder = new LeastUtilitiedWorkerFinder();
/**
* Creates a new connection manager.
*/
ConnectionManager(DefaultControllerNode localNode,
ClusterNodesDelegate nodesDelegate,
CommunicationsDelegate commsDelegate,
SerializationService serializationService) {
this.localNode = localNode;
this.nodesDelegate = nodesDelegate;
this.commsDelegate = commsDelegate;
this.serializationService = serializationService;
commsDelegate.setSender(this);
startCommunications();
startListening();
startInitiating();
log.info("Started");
}
/**
* Shuts down the connection manager.
*/
void shutdown() {
connectionListener.shutdown();
for (ClusterIOWorker worker : workers) {
worker.shutdown();
}
log.info("Stopped");
}
/**
* Adds the node to the list of monitored nodes.
*
* @param node node to be added
*/
void addNode(DefaultControllerNode node) {
nodes.add(node);
}
/**
* Removes the node from the list of monitored nodes.
*
* @param node node to be removed
*/
void removeNode(DefaultControllerNode node) {
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
stream.close();
}
}
/**
* Removes the stream associated with the specified node.
*
* @param node node whose stream to remove
*/
void removeNodeStream(DefaultControllerNode node) {
nodesDelegate.nodeVanished(node);
streams.remove(node.id());
}
@Override
public boolean send(NodeId nodeId, ClusterMessage message) {
ClusterMessageStream stream = streams.get(nodeId);
if (stream != null) {
try {
stream.write(message);
return true;
} catch (IOException e) {
log.warn("Unable to send a message about {} to node {}",
message.subject(), nodeId);
}
}
return false;
}
/**
* Kicks off the IO loops and waits for them to startup.
*/
private void startCommunications() {
HelloMessage hello = new HelloMessage(localNode.id(), localNode.ip(),
localNode.tcpPort());
for (int i = 0; i < WORKERS; i++) {
try {
ClusterIOWorker worker =
new ClusterIOWorker(this, commsDelegate,
serializationService, hello);
workers.add(worker);
commExecutors.execute(worker);
} catch (IOException e) {
log.warn("Unable to start communication worker", e);
}
}
// Wait for the IO loops to start
for (ClusterIOWorker loop : workers) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
/**
* Starts listening for connections from peer cluster members.
*/
private void startListening() {
try {
connectionListener =
new ClusterConnectionListener(localNode.ip(), localNode.tcpPort(),
workerFinder);
listenExecutor.execute(connectionListener);
if (!connectionListener.awaitStart(START_TIMEOUT)) {
log.warn("Listener did not start on-time; moving on...");
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
*/
private void initiateConnection(DefaultControllerNode node,
ClusterIOWorker loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
/**
* Attempts to connect to any nodes that do not have an associated connection.
*/
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY,
CONNECTION_CUSTODIAN_FREQUENCY);
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes) {
if (node != localNode && !streams.containsKey(node.id())) {
try {
initiateConnection(node, workerFinder.findWorker());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
// Finds the least utilitied IO loop.
private class LeastUtilitiedWorkerFinder implements WorkerFinder {
@Override
public ClusterIOWorker findWorker() {
ClusterIOWorker leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (ClusterIOWorker worker : workers) {
int count = worker.streamCount();
if (count == 0) {
return worker;
}
if (count < minCount) {
leastUtilized = worker;
minCount = count;
}
}
return leastUtilized;
}
}
}
......@@ -4,10 +4,9 @@ import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.nio.AcceptorLoop;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
......@@ -15,33 +14,18 @@ import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.net.InetAddress.getByAddress;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
import static org.onlab.util.Tools.namedThreads;
/**
* Distributed implementation of the cluster nodes store.
......@@ -52,146 +36,69 @@ public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
private static final int HELLO_MSG = 1;
private static final int ECHO_MSG = 2;
private final Logger log = LoggerFactory.getLogger(getClass());
private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
private static final long START_TIMEOUT = 1000;
private static final long SELECT_TIMEOUT = 50;
private static final int WORKERS = 3;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private static final boolean SO_NO_DELAY = false;
private static final int SO_SEND_BUFFER_SIZE = COMM_BUFFER_SIZE;
private static final int SO_RCV_BUFFER_SIZE = COMM_BUFFER_SIZE;
private DefaultControllerNode self;
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
// Means to track message streams to other nodes.
private final Map<NodeId, TLVMessageStream> streams = new ConcurrentHashMap<>();
private final Map<SocketChannel, DefaultControllerNode> nodesByChannel = new ConcurrentHashMap<>();
// Executor pools for listening and managing connections to other nodes.
private final ExecutorService listenExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-listen"));
private final ExecutorService commExecutors =
Executors.newFixedThreadPool(WORKERS, namedThreads("onos-comm-cluster"));
private final ExecutorService heartbeatExecutor =
Executors.newSingleThreadExecutor(namedThreads("onos-comm-heartbeat"));
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CommunicationsDelegate commsDelegate;
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private SerializationService serializationService;
private ListenLoop listenLoop;
private List<CommLoop> commLoops = new ArrayList<>(WORKERS);
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
startCommunications();
startListening();
startInitiating();
establishSelfIdentity();
connectionManager = new ConnectionManager(localNode, nodesDelegate,
commsDelegate, serializationService);
log.info("Started");
}
@Deactivate
public void deactivate() {
listenLoop.shutdown();
for (CommLoop loop : commLoops) {
loop.shutdown();
}
log.info("Stopped");
}
// Loads the cluster definition file
/**
* Loads the cluster definition file.
*/
private void loadClusterDefinition() {
// ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
// try {
// Set<DefaultControllerNode> storedNodes = cds.read();
// for (DefaultControllerNode node : storedNodes) {
// nodes.put(node.id(), node);
// }
// } catch (IOException e) {
// log.error("Unable to read cluster definitions", e);
// }
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
self = nodes.get(new NodeId(ip.toString()));
// As a fall-back, let's make sure we at least know who we are.
if (self == null) {
self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(self.id(), self);
states.put(self.id(), State.ACTIVE);
}
}
// Kicks off the IO loops.
private void startCommunications() {
for (int i = 0; i < WORKERS; i++) {
try {
CommLoop loop = new CommLoop();
commLoops.add(loop);
commExecutors.execute(loop);
} catch (IOException e) {
log.warn("Unable to start comm IO loop", e);
}
}
// Wait for the IO loops to start
for (CommLoop loop : commLoops) {
if (!loop.awaitStart(START_TIMEOUT)) {
log.warn("Comm loop did not start on-time; moving on...");
}
}
}
// Starts listening for connections from peer cluster members.
private void startListening() {
ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
try {
listenLoop = new ListenLoop(self.ip(), self.tcpPort());
listenExecutor.execute(listenLoop);
if (!listenLoop.awaitStart(START_TIMEOUT)) {
log.warn("Listen loop did not start on-time; moving on...");
Set<DefaultControllerNode> storedNodes = cds.read();
for (DefaultControllerNode node : storedNodes) {
nodes.put(node.id(), node);
}
} catch (IOException e) {
log.error("Unable to listen for cluster connections", e);
log.error("Unable to read cluster definitions", e);
}
}
/**
* Initiates open connection request and registers the pending socket
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
* @throws java.io.IOException if the socket could not be open or connected
* Determines who the local controller node is.
*/
private void openConnection(DefaultControllerNode node, CommLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(getByAddress(node.ip().toOctets()), node.tcpPort());
SocketChannel ch = SocketChannel.open();
nodesByChannel.put(ch, node);
ch.configureBlocking(false);
ch.connect(sa);
loop.connectStream(ch);
}
private void establishSelfIdentity() {
// Establishes the controller's own identity.
IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
localNode = nodes.get(new NodeId(ip.toString()));
// Attempts to connect to any nodes that do not have an associated connection.
private void startInitiating() {
timer.schedule(connectionCustodian, CONNECTION_CUSTODIAN_DELAY, CONNECTION_CUSTODIAN_FREQUENCY);
// As a fall-back, let's make sure we at least know who we are.
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
states.put(localNode.id(), State.ACTIVE);
}
}
@Override
public ControllerNode getLocalNode() {
return self;
return localNode;
}
@Override
......@@ -215,179 +122,29 @@ public class DistributedClusterStore
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
connectionManager.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
nodes.remove(nodeId);
TLVMessageStream stream = streams.remove(nodeId);
if (stream != null) {
stream.close();
}
}
// Listens and accepts inbound connections from other cluster nodes.
private class ListenLoop extends AcceptorLoop {
ListenLoop(IpPrefix ip, int tcpPort) throws IOException {
super(SELECT_TIMEOUT, new InetSocketAddress(getByAddress(ip.toOctets()), tcpPort));
}
@Override
protected void acceptConnection(ServerSocketChannel channel) throws IOException {
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
Socket so = sc.socket();
so.setTcpNoDelay(SO_NO_DELAY);
so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
findLeastUtilizedLoop().acceptStream(sc);
}
}
private class CommLoop extends IOLoop<TLVMessage, TLVMessageStream> {
CommLoop() throws IOException {
super(SELECT_TIMEOUT);
}
@Override
protected TLVMessageStream createStream(ByteChannel byteChannel) {
return new TLVMessageStream(this, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
}
@Override
protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
TLVMessageStream tlvStream = (TLVMessageStream) stream;
for (TLVMessage message : messages) {
// TODO: add type-based dispatching here... this is just a hack to get going
if (message.type() == HELLO_MSG) {
processHello(message, tlvStream);
} else if (message.type() == ECHO_MSG) {
processEcho(message, tlvStream);
} else {
log.info("Deal with other messages");
}
}
}
@Override
public TLVMessageStream acceptStream(SocketChannel channel) {
TLVMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(createHello(self));
} catch (IOException e) {
log.warn("Unable to accept connection from an unknown end-point", e);
}
return stream;
}
@Override
public TLVMessageStream connectStream(SocketChannel channel) {
TLVMessageStream stream = super.connectStream(channel);
DefaultControllerNode node = nodesByChannel.get(channel);
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
log.debug("Opened connection to node {}", node.id());
nodesByChannel.remove(channel);
}
return stream;
}
@Override
protected void connect(SelectionKey key) throws IOException {
try {
super.connect(key);
TLVMessageStream stream = (TLVMessageStream) key.attachment();
send(stream, createHello(self));
} catch (IOException e) {
if (!Objects.equals(e.getMessage(), "Connection refused")) {
throw e;
}
connectionManager.removeNode(node);
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
protected void removeStream(MessageStream<TLVMessage> stream) {
DefaultControllerNode node = ((TLVMessageStream) stream).node();
if (node != null) {
log.info("Closed connection to node {}", node.id());
states.put(node.id(), State.INACTIVE);
streams.remove(node.id());
}
super.removeStream(stream);
}
}
// Processes a HELLO message from a peer controller node.
private void processHello(TLVMessage message, TLVMessageStream stream) {
// FIXME: pure hack for now
String data = new String(message.data());
String[] fields = data.split(":");
DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
valueOf(fields[1]),
Integer.parseInt(fields[2]));
stream.setNode(node);
public void nodeDetected(DefaultControllerNode node) {
nodes.put(node.id(), node);
streams.put(node.id(), stream);
states.put(node.id(), State.ACTIVE);
}
// Processes an ECHO message from a peer controller node.
private void processEcho(TLVMessage message, TLVMessageStream tlvStream) {
// TODO: implement heart-beat refresh
log.info("Dealing with echoes...");
}
// Sends message to the specified stream.
private void send(TLVMessageStream stream, TLVMessage message) {
try {
stream.write(message);
} catch (IOException e) {
log.warn("Unable to send message to {}", stream.node().id());
}
}
// Creates a hello message to be sent to a peer controller node.
private TLVMessage createHello(DefaultControllerNode self) {
return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
}
// Sweeps through all controller nodes and attempts to open connection to
// those that presently do not have one.
private class ConnectionCustodian extends TimerTask {
@Override
public void run() {
for (DefaultControllerNode node : nodes.values()) {
if (node != self && !streams.containsKey(node.id())) {
try {
openConnection(node, findLeastUtilizedLoop());
} catch (IOException e) {
log.debug("Unable to connect", e);
}
}
}
}
}
// Finds the least utilities IO loop.
private CommLoop findLeastUtilizedLoop() {
CommLoop leastUtilized = null;
int minCount = Integer.MAX_VALUE;
for (CommLoop loop : commLoops) {
int count = loop.streamCount();
if (count == 0) {
return loop;
}
if (count < minCount) {
leastUtilized = loop;
minCount = count;
}
public void nodeVanished(DefaultControllerNode node) {
states.put(node.id(), State.INACTIVE);
}
return leastUtilized;
}
}
......
package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
/**
* Created by tom on 9/29/14.
*/
public interface MessageSender {
/**
* Sends the specified message to the given cluster node.
*
* @param nodeId node identifier
* @param message mesage to send
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(NodeId nodeId, ClusterMessage message);
}
package org.onlab.onos.store.cluster.impl;
import org.onlab.nio.AbstractMessage;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications using TLVs.
*/
public class TLVMessage extends AbstractMessage {
private final int type;
private final byte[] data;
/**
* Creates an immutable TLV message.
*
* @param type message type
* @param data message data bytes
*/
public TLVMessage(int type, byte[] data) {
this.length = data.length + TLVMessageStream.METADATA_LENGTH;
this.type = type;
this.data = data;
}
/**
* Returns the message type indicator.
*
* @return message type
*/
public int type() {
return type;
}
/**
* Returns the data bytes.
*
* @return message data
*/
public byte[] data() {
return data;
}
@Override
public int hashCode() {
return Objects.hash(type, data);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final TLVMessage other = (TLVMessage) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.data, other.data);
}
@Override
public String toString() {
return toStringHelper(this).add("type", type).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.impl;
/**
* Provides means to find a worker IO loop.
*/
public interface WorkerFinder {
/**
* Finds a suitable worker.
*
* @return available worker
*/
ClusterIOWorker findWorker();
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import java.util.Set;
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
* Sends a message to the specified controller node.
*
* @param message message to send
* @param toNodeId node identifier
* @return true if the message was sent sucessfully; false if there is
* no stream or if there was an error
*/
boolean send(ClusterMessage message, NodeId toNodeId);
/**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Removes the specified subscriber from the given message subject.
*
* @param subject message subject
* @param subscriber message subscriber
*/
void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
/**
* Returns the set of subscribers for the specified message subject.
*
* @param subject message subject
* @return set of message subscribers
*/
Set<MessageSubscriber> getSubscribers(MessageSubject subject);
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.AbstractMessage;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Base message for cluster-wide communications.
*/
public abstract class ClusterMessage extends AbstractMessage {
private final MessageSubject subject;
/**
* Creates a cluster message.
*
* @param subject message subject
*/
protected ClusterMessage(MessageSubject subject) {
this.subject = subject;
}
/**
* Returns the message subject indicator.
*
* @return message subject
*/
public MessageSubject subject() {
return subject;
}
@Override
public String toString() {
return toStringHelper(this).add("subject", subject).add("length", length).toString();
}
}
package org.onlab.onos.store.cluster.impl;
package org.onlab.onos.store.cluster.messaging;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
......@@ -10,29 +10,29 @@ import java.nio.channels.ByteChannel;
import static com.google.common.base.Preconditions.checkState;
/**
* Stream for transferring TLV messages between cluster members.
* Stream for transferring messages between two cluster members.
*/
public class TLVMessageStream extends MessageStream<TLVMessage> {
public class ClusterMessageStream extends MessageStream<ClusterMessage> {
public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafecafefeedL;
private static final int COMM_BUFFER_SIZE = 32 * 1024;
private static final int COMM_IDLE_TIME = 500;
private DefaultControllerNode node;
private SerializationService serializationService;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param serializationService service for encoding/decoding messages
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
*/
protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
super(loop, byteChannel, bufferSize, maxIdleMillis);
public ClusterMessageStream(SerializationService serializationService,
IOLoop<ClusterMessage, ?> loop,
ByteChannel byteChannel) {
super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
this.serializationService = serializationService;
}
/**
......@@ -40,7 +40,7 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
*
* @return controller node
*/
DefaultControllerNode node() {
public DefaultControllerNode node() {
return node;
}
......@@ -49,47 +49,19 @@ public class TLVMessageStream extends MessageStream<TLVMessage> {
*
* @param node controller node
*/
void setNode(DefaultControllerNode node) {
public void setNode(DefaultControllerNode node) {
checkState(this.node == null, "Stream is already bound to a node");
this.node = node;
}
@Override
protected TLVMessage read(ByteBuffer buffer) {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int type = buffer.getInt();
length = buffer.getInt();
// TODO: add deserialization hook here
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
return new TLVMessage(type, data);
protected ClusterMessage read(ByteBuffer buffer) {
return serializationService.decode(buffer);
}
@Override
protected void write(TLVMessage message, ByteBuffer buffer) {
buffer.putLong(MARKER);
buffer.putInt(message.type());
buffer.putInt(message.length());
// TODO: add serialization hook here
buffer.put(message.data());
protected void write(ClusterMessage message, ByteBuffer buffer) {
serializationService.encode(message, buffer);
}
}
......
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
/**l
* Echo heart-beat message that nodes send to each other.
*/
public class EchoMessage extends ClusterMessage {
private NodeId nodeId;
// For serialization
private EchoMessage() {
super(MessageSubject.HELLO);
nodeId = null;
}
/**
* Creates a new heart-beat echo message.
*
* @param nodeId sending node identification
*/
public EchoMessage(NodeId nodeId) {
super(MessageSubject.HELLO);
nodeId = nodeId;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
}
package org.onlab.onos.store.cluster.messaging;
import org.onlab.onos.cluster.NodeId;
import org.onlab.packet.IpPrefix;
/**
* Hello message that nodes use to greet each other.
*/
public class HelloMessage extends ClusterMessage {
private NodeId nodeId;
private IpPrefix ipAddress;
private int tcpPort;
// For serialization
private HelloMessage() {
super(MessageSubject.HELLO);
nodeId = null;
ipAddress = null;
tcpPort = 0;
}
/**
* Creates a new hello message for the specified end-point data.
*
* @param nodeId sending node identification
* @param ipAddress sending node IP address
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
super(MessageSubject.HELLO);
nodeId = nodeId;
ipAddress = ipAddress;
tcpPort = tcpPort;
}
/**
* Returns the sending node identifer.
*
* @return node identifier
*/
public NodeId nodeId() {
return nodeId;
}
/**
* Returns the sending node IP address.
*
* @return node IP address
*/
public IpPrefix ipAddress() {
return ipAddress;
}
/**
* Returns the sending node TCP listen port.
*
* @return TCP listen port
*/
public int tcpPort() {
return tcpPort;
}
}
package org.onlab.onos.store.cluster.messaging;
/**
* Representation of a message subject.
*/
public enum MessageSubject {
/** Represents a first greeting message. */
HELLO,
/** Signifies a heart-beat message. */
ECHO
}
package org.onlab.onos.store.cluster.messaging;
/**
* Represents a message consumer.
*/
public interface MessageSubscriber {
/**
* Receives the specified cluster message.
*
* @param message message to be received
*/
void receive(ClusterMessage message);
}
package org.onlab.onos.store.cluster.messaging;
import java.nio.ByteBuffer;
/**
* Service for serializing/deserializing intra-cluster messages.
*/
public interface SerializationService {
/**
* Decodes the specified byte buffer to obtain a message within.
*
* @param buffer byte buffer with message(s)
* @return parsed message
*/
ClusterMessage decode(ByteBuffer buffer);
/**
* Encodes the specified message into the given byte buffer.
*
* @param message message to be encoded
* @param buffer byte buffer to receive the message data
*/
void encode(ClusterMessage message, ByteBuffer buffer);
}
package org.onlab.onos.store.cluster.messaging.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
import org.onlab.onos.store.cluster.impl.MessageSender;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
import java.util.Set;
/**
* Implements the cluster communication services to use by other stores.
*/
@Component(immediate = true)
@Service
public class ClusterCommunicationManager
implements ClusterCommunicationService, CommunicationsDelegate {
// TODO: use something different that won't require synchronization
private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
private MessageSender messageSender;
@Override
public boolean send(ClusterMessage message, NodeId toNodeId) {
return messageSender.send(toNodeId, message);
}
@Override
public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.put(subject, subscriber);
}
@Override
public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
subscribers.remove(subject, subscriber);
}
@Override
public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
return ImmutableSet.copyOf(subscribers.get(subject));
}
@Override
public void dispatch(ClusterMessage message) {
Set<MessageSubscriber> set = getSubscribers(message.subject());
if (set != null) {
for (MessageSubscriber subscriber : set) {
subscriber.receive(message);
}
}
}
@Override
public void setSender(MessageSender messageSender) {
this.messageSender = messageSender;
}
}
package org.onlab.onos.store.cluster.messaging.impl;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import java.nio.ByteBuffer;
import static com.google.common.base.Preconditions.checkState;
/**
* Factory for parsing messages sent between cluster members.
*/
public class MessageSerializer implements SerializationService {
private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
private static final int LENGTH_OFFSET = 12;
private static final long MARKER = 0xfeedcafebeaddeadL;
@Override
public ClusterMessage decode(ByteBuffer buffer) {
try {
// Do we have enough bytes to read the header? If not, bail.
if (buffer.remaining() < METADATA_LENGTH) {
return null;
}
// Peek at the length and if we have enough to read the entire message
// go ahead, otherwise bail.
int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
if (buffer.remaining() < length) {
return null;
}
// At this point, we have enough data to read a complete message.
long marker = buffer.getLong();
checkState(marker == MARKER, "Incorrect message marker");
int subjectOrdinal = buffer.getInt();
MessageSubject subject = MessageSubject.values()[subjectOrdinal];
length = buffer.getInt();
// TODO: sanity checking for length
byte[] data = new byte[length - METADATA_LENGTH];
buffer.get(data);
// TODO: add deserialization hook here; for now this hack
return null; // actually deserialize
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
return null;
}
@Override
public void encode(ClusterMessage message, ByteBuffer buffer) {
try {
int i = 0;
// Type based lookup for proper encoder
} catch (Exception e) {
// TODO: recover from exceptions by forwarding stream to next marker
e.printStackTrace();
}
}
}
......@@ -49,6 +49,7 @@ public class DistributedClusterStore
private final MembershipListener listener = new InternalMembershipListener();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Override
@Activate
public void activate() {
super.activate();
......@@ -56,7 +57,7 @@ public class DistributedClusterStore
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
= new OptionalCacheLoader<>(storeService, rawNodes);
= new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
......
......@@ -52,7 +52,7 @@ implements MastershipStore {
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
= new OptionalCacheLoader<>(storeService, rawMasters);
= new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
......
......@@ -15,6 +15,7 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
import org.onlab.onos.store.serializers.KryoSerializationService;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -32,6 +33,9 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected KryoSerializationService kryoSerializationService;
protected HazelcastInstance theInstance;
@Activate
......@@ -46,7 +50,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return serialized object
*/
protected byte[] serialize(Object obj) {
return storeService.serialize(obj);
return kryoSerializationService.serialize(obj);
}
/**
......@@ -57,7 +61,7 @@ public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDel
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
return storeService.deserialize(bytes);
return kryoSerializationService.deserialize(bytes);
}
......
......@@ -2,6 +2,8 @@ package org.onlab.onos.store.common;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
import com.hazelcast.core.IMap;
......@@ -16,28 +18,28 @@ import com.hazelcast.core.IMap;
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
private final StoreService storeService;
private final KryoSerializationService kryoSerializationService;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
* @param storeService to use for serialization
* @param kryoSerializationService to use for serialization
* @param rawMap underlying IMap
*/
public OptionalCacheLoader(StoreService storeService, IMap<byte[], byte[]> rawMap) {
this.storeService = checkNotNull(storeService);
public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
this.kryoSerializationService = checkNotNull(kryoSerializationService);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
byte[] keyBytes = storeService.serialize(key);
byte[] keyBytes = kryoSerializationService.serialize(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
V dev = storeService.deserialize(valBytes);
V dev = kryoSerializationService.deserialize(valBytes);
return Optional.of(dev);
}
}
......
......@@ -5,46 +5,14 @@ import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import de.javakaffee.kryoserializers.URISerializer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
import org.onlab.onos.store.serializers.DefaultPortSerializer;
import org.onlab.onos.store.serializers.DeviceIdSerializer;
import org.onlab.onos.store.serializers.IpPrefixSerializer;
import org.onlab.onos.store.serializers.LinkKeySerializer;
import org.onlab.onos.store.serializers.NodeIdSerializer;
import org.onlab.onos.store.serializers.PortNumberSerializer;
import org.onlab.onos.store.serializers.ProviderIdSerializer;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
/**
* Auxiliary bootstrap of distributed store.
......@@ -58,55 +26,18 @@ public class StoreManager implements StoreService {
private final Logger log = LoggerFactory.getLogger(getClass());
protected HazelcastInstance instance;
private KryoPool serializerPool;
@Activate
public void activate() {
try {
Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
instance = Hazelcast.newHazelcastInstance(config);
setupKryoPool();
log.info("Started");
} catch (FileNotFoundException e) {
log.error("Unable to configure Hazelcast", e);
}
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(10);
}
@Deactivate
public void deactivate() {
instance.shutdown();
......@@ -118,18 +49,4 @@ public class StoreManager implements StoreService {
return instance;
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
......
......@@ -15,22 +15,4 @@ public interface StoreService {
*/
HazelcastInstance getHazelcastInstance();
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......
......@@ -46,9 +46,8 @@ public class TestStoreManager extends StoreManager {
this.instance = instance;
}
// Hazelcast setup removed from original code.
@Override
public void activate() {
setupKryoPool();
// Hazelcast setup removed from original code.
}
}
......
......@@ -87,7 +87,7 @@ public class DistributedDeviceStore
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
= new OptionalCacheLoader<>(storeService, rawDevices);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
......@@ -97,7 +97,7 @@ public class DistributedDeviceStore
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
= new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
......
......@@ -70,7 +70,7 @@ public class DistributedLinkStore
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
= new OptionalCacheLoader<>(storeService, rawLinks);
= new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
......
......@@ -36,6 +36,8 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
......@@ -61,6 +63,7 @@ public class DistributedDeviceStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
......@@ -82,7 +85,10 @@ public class DistributedDeviceStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
deviceStore = new TestDistributedDeviceStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
deviceStore.activate();
}
......@@ -90,6 +96,8 @@ public class DistributedDeviceStoreTest {
public void tearDown() throws Exception {
deviceStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -384,8 +392,10 @@ public class DistributedDeviceStoreTest {
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
public TestDistributedDeviceStore(StoreService storeService) {
public TestDistributedDeviceStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
......@@ -30,6 +30,8 @@ import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.serializers.KryoSerializationManager;
import org.onlab.onos.store.serializers.KryoSerializationService;
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
......@@ -49,6 +51,7 @@ public class DistributedLinkStoreTest {
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
......@@ -68,13 +71,17 @@ public class DistributedLinkStoreTest {
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
linkStore = new TestDistributedLinkStore(storeManager);
serializationMgr = new KryoSerializationManager();
serializationMgr.activate();
linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
serializationMgr.deactivate();
storeManager.deactivate();
}
......@@ -354,8 +361,10 @@ public class DistributedLinkStoreTest {
class TestDistributedLinkStore extends DistributedLinkStore {
TestDistributedLinkStore(StoreService storeService) {
TestDistributedLinkStore(StoreService storeService,
KryoSerializationService kryoSerializationService) {
this.storeService = storeService;
this.kryoSerializationService = kryoSerializationService;
}
}
}
......
package org.onlab.onos.store.serializers;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Element;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.javakaffee.kryoserializers.URISerializer;
/**
* Serialization service using Kryo.
*/
@Component(immediate = true)
@Service
public class KryoSerializationManager implements KryoSerializationService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
@Activate
public void activate() {
setupKryoPool();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
/**
* Sets up the common serialzers pool.
*/
protected void setupKryoPool() {
// FIXME Slice out types used in common to separate pool/namespace.
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ControllerNode.State.class,
Device.Type.class,
DefaultControllerNode.class,
DefaultDevice.class,
MastershipRole.class,
Port.class,
Element.class,
Link.Type.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
.register(URI.class, new URISerializer())
.register(NodeId.class, new NodeIdSerializer())
.register(ProviderId.class, new ProviderIdSerializer())
.register(DeviceId.class, new DeviceIdSerializer())
.register(PortNumber.class, new PortNumberSerializer())
.register(DefaultPort.class, new DefaultPortSerializer())
.register(LinkKey.class, new LinkKeySerializer())
.register(ConnectPoint.class, new ConnectPointSerializer())
.register(DefaultLink.class, new DefaultLinkSerializer())
.build()
.populate(1);
}
@Override
public byte[] serialize(final Object obj) {
return serializerPool.serialize(obj);
}
@Override
public <T> T deserialize(final byte[] bytes) {
if (bytes == null) {
return null;
}
return serializerPool.deserialize(bytes);
}
}
package org.onlab.onos.store.serializers;
// TODO: To be replaced with SerializationService from IOLoop activity
/**
* Service to serialize Objects into byte array.
*/
public interface KryoSerializationService {
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @return serialized bytes
*/
public byte[] serialize(final Object obj);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param bytes bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final byte[] bytes);
}
......@@ -19,6 +19,9 @@
<bundle>mvn:de.javakaffee/kryo-serializers/0.27</bundle>
<bundle>mvn:org.onlab.onos/onlab-nio/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.codehaus.jackson/jackson-core-asl/1.9.13</bundle>
<bundle>mvn:org.codehaus.jackson/jackson-mapper-asl/1.9.13</bundle>
</feature>
<feature name="onos-thirdparty-web" version="1.0.0"
......@@ -122,4 +125,10 @@
<bundle>mvn:org.onlab.onos/onos-app-foo/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-app-config" version="1.0.0"
description="ONOS network config reader">
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-app-config/1.0.0-SNAPSHOT</bundle>
</feature>
</features>
......
......@@ -92,6 +92,17 @@
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<!-- Web related -->
<dependency>
......
......@@ -106,10 +106,10 @@ public class OpenFlowPacketProvider extends AbstractProvider implements PacketPr
for (Instruction inst : packet.treatment().instructions()) {
if (inst.type().equals(Instruction.Type.OUTPUT)) {
p = portDesc(((OutputInstruction) inst).port());
if (!sw.getPorts().contains(p)) {
log.warn("Tried to write out non-existint port {}", p.getPortNo());
/*if (!sw.getPorts().contains(p)) {
log.warn("Tried to write out non-existent port {}", p.getPortNo());
continue;
}
}*/
OFPacketOut po = packetOut(sw, eth, p.getPortNo());
sw.sendMsg(po);
}
......
......@@ -154,9 +154,9 @@ public class OpenFlowPacketProviderTest {
assertEquals("message sent incorrectly", 0, sw.sent.size());
//to missing port
OutboundPacket portFailPkt = outPacket(DID, TR_MISSING, eth);
provider.emit(portFailPkt);
assertEquals("extra message sent", 1, sw.sent.size());
//OutboundPacket portFailPkt = outPacket(DID, TR_MISSING, eth);
//provider.emit(portFailPkt);
//assertEquals("extra message sent", 1, sw.sent.size());
}
......
......@@ -9,5 +9,5 @@
nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
onos-package
for node in $nodes; do printf "%s: " $node; onos-install -f $node; done
for node in $nodes; do (printf "%s: %s\n" "$node" "`onos-install -f $node`")& done
for node in $nodes; do onos-wait-for-start $node; done
......