tom

Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next

Showing 21 changed files with 637 additions and 257 deletions
package org.onlab.onos.net;
public final class AnnotationsUtil {
public static boolean isEqual(Annotations lhs, Annotations rhs) {
if (lhs == rhs) {
return true;
}
if (lhs == null || rhs == null) {
return false;
}
if (!lhs.keys().equals(rhs.keys())) {
return false;
}
for (String key : lhs.keys()) {
if (!lhs.value(key).equals(rhs.value(key))) {
return false;
}
}
return true;
}
// not to be instantiated
private AnnotationsUtil() {}
}
package org.onlab.onos.net.link;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Description;
import org.onlab.onos.net.Link;
/**
* Describes an infrastructure link.
*/
public interface LinkDescription {
public interface LinkDescription extends Description {
/**
* Returns the link source.
......
......@@ -76,7 +76,7 @@ public class HostManager
eventDispatcher.addSink(HostEvent.class, listenerRegistry);
monitor = new HostMonitor(deviceService, packetService, this);
monitor.start();
}
@Deactivate
......
......@@ -2,7 +2,7 @@ package org.onlab.onos.net.host.impl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
......@@ -33,8 +33,6 @@ import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Monitors hosts on the dataplane to detect changes in host data.
......@@ -44,15 +42,6 @@ import org.slf4j.LoggerFactory;
* probe for hosts that have not yet been detected (specified by IP address).
*/
public class HostMonitor implements TimerTask {
private static final Logger log = LoggerFactory.getLogger(HostMonitor.class);
private static final byte[] ZERO_MAC_ADDRESS =
MacAddress.valueOf("00:00:00:00:00:00").getAddress();
// TODO put on Ethernet
private static final byte[] BROADCAST_MAC =
MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress();
private DeviceService deviceService;
private PacketService packetService;
private HostManager hostManager;
......@@ -64,8 +53,15 @@ public class HostMonitor implements TimerTask {
private static final long DEFAULT_PROBE_RATE = 30000; // milliseconds
private long probeRate = DEFAULT_PROBE_RATE;
private final Timeout timeout;
private Timeout timeout;
/**
* Creates a new host monitor.
*
* @param deviceService device service used to find edge ports
* @param packetService packet service used to send packets on the data plane
* @param hostService host service used to look up host information
*/
public HostMonitor(DeviceService deviceService, PacketService packetService,
HostManager hostService) {
......@@ -73,24 +69,59 @@ public class HostMonitor implements TimerTask {
this.packetService = packetService;
this.hostManager = hostService;
monitoredAddresses = new HashSet<>();
monitoredAddresses = Collections.newSetFromMap(
new ConcurrentHashMap<IpAddress, Boolean>());
hostProviders = new ConcurrentHashMap<>();
timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
}
/**
* Adds an IP address to be monitored by the host monitor. The monitor will
* periodically probe the host to detect changes.
*
* @param ip IP address of the host to monitor
*/
void addMonitoringFor(IpAddress ip) {
monitoredAddresses.add(ip);
}
/**
* Stops monitoring the given IP address.
*
* @param ip IP address to stop monitoring on
*/
void stopMonitoring(IpAddress ip) {
monitoredAddresses.remove(ip);
}
/**
* Starts the host monitor. Does nothing if the monitor is already running.
*/
void start() {
synchronized (this) {
if (timeout == null) {
timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
}
}
}
/**
* Stops the host monitor.
*/
void shutdown() {
timeout.cancel();
synchronized (this) {
timeout.cancel();
timeout = null;
}
}
/**
* Registers a host provider with the host monitor. The monitor can use the
* provider to probe hosts.
*
* @param provider the host provider to register
*/
void registerHostProvider(HostProvider provider) {
hostProviders.put(provider.id(), provider);
}
......@@ -117,7 +148,7 @@ public class HostMonitor implements TimerTask {
}
}
timeout = Timer.getTimer().newTimeout(this, probeRate, TimeUnit.MILLISECONDS);
this.timeout = Timer.getTimer().newTimeout(this, probeRate, TimeUnit.MILLISECONDS);
}
/**
......@@ -173,12 +204,12 @@ public class HostMonitor implements TimerTask {
arp.setSenderHardwareAddress(sourceMac.getAddress())
.setSenderProtocolAddress(sourceIp.toOctets())
.setTargetHardwareAddress(ZERO_MAC_ADDRESS)
.setTargetHardwareAddress(MacAddress.ZERO_MAC_ADDRESS)
.setTargetProtocolAddress(targetIp.toOctets());
Ethernet ethernet = new Ethernet();
ethernet.setEtherType(Ethernet.TYPE_ARP)
.setDestinationMACAddress(BROADCAST_MAC)
.setDestinationMACAddress(MacAddress.BROADCAST_MAC)
.setSourceMACAddress(sourceMac.getAddress())
.setPayload(arp);
......
......@@ -11,7 +11,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
......@@ -33,6 +33,7 @@ import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
import org.onlab.onos.store.common.impl.Timestamped;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
......@@ -136,8 +137,7 @@ public class GossipDeviceStore
// Collection of DeviceDescriptions for a Device
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= createIfAbsentUnchecked(deviceDescs, deviceId,
new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
= getDeviceDescriptions(deviceId);
DeviceDescriptions descs
......@@ -196,7 +196,7 @@ public class GossipDeviceStore
// We allow only certain attributes to trigger update
if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
!isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
!AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
if (!replaced) {
......@@ -223,8 +223,7 @@ public class GossipDeviceStore
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= createIfAbsentUnchecked(deviceDescs, deviceId,
new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
= getDeviceDescriptions(deviceId);
// locking device
synchronized (providerDescs) {
......@@ -327,7 +326,7 @@ public class GossipDeviceStore
Port newPort,
Map<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled() ||
!isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
!AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
ports.put(oldPort.number(), newPort);
return new DeviceEvent(PORT_UPDATED, device, newPort);
......@@ -358,7 +357,13 @@ public class GossipDeviceStore
// exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId,
new InitConcurrentHashMap<PortNumber, Port>());
NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
}
private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
DeviceId deviceId) {
return createIfAbsentUnchecked(deviceDescs, deviceId,
NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
}
@Override
......@@ -438,33 +443,18 @@ public class GossipDeviceStore
@Override
public DeviceEvent removeDevice(DeviceId deviceId) {
synchronized (this) {
ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
synchronized (descs) {
Device device = devices.remove(deviceId);
// should DEVICE_REMOVED carry removed ports?
devicePorts.get(deviceId).clear();
availableDevices.remove(deviceId);
descs.clear();
return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null);
new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
if (lhs == rhs) {
return true;
}
if (lhs == null || rhs == null) {
return false;
}
if (!lhs.keys().equals(rhs.keys())) {
return false;
}
for (String key : lhs.keys()) {
if (!lhs.value(key).equals(rhs.value(key))) {
return false;
}
}
return true;
}
/**
* Returns a Device, merging description given from multiple Providers.
*
......@@ -567,14 +557,6 @@ public class GossipDeviceStore
return fallBackPrimary;
}
private static final class InitConcurrentHashMap<K, V> implements
ConcurrentInitializer<ConcurrentMap<K, V>> {
@Override
public ConcurrentMap<K, V> get() throws ConcurrentException {
return new ConcurrentHashMap<>();
}
}
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
......
......@@ -116,17 +116,19 @@ public class GossipDeviceStoreTest {
deviceClockManager.deactivate();
}
private void putDevice(DeviceId deviceId, String swVersion) {
private void putDevice(DeviceId deviceId, String swVersion,
SparseAnnotations... annotations) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN);
HW, swVersion, SN, annotations);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
private void putDeviceAncillary(DeviceId deviceId, String swVersion,
SparseAnnotations... annotations) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN);
HW, swVersion, SN, annotations);
deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
}
......@@ -448,16 +450,37 @@ public class GossipDeviceStoreTest {
@Test
public final void testRemoveDevice() {
putDevice(DID1, SW1);
putDevice(DID1, SW1, A1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true, A2)
);
deviceStore.updatePorts(PID, DID1, pds);
putDevice(DID2, SW1);
assertEquals(2, deviceStore.getDeviceCount());
assertEquals(1, deviceStore.getPorts(DID1).size());
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(1, deviceStore.getDeviceCount());
assertEquals(0, deviceStore.getPorts(DID1).size());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
List<PortDescription> pds2 = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(PID, DID1, pds2);
// annotations should not survive
assertEquals(2, deviceStore.getDeviceCount());
assertEquals(1, deviceStore.getPorts(DID1).size());
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
}
// If Delegates should be called only on remote events,
......
......@@ -9,7 +9,7 @@ import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
......@@ -28,6 +28,7 @@ import org.onlab.onos.net.device.DeviceStoreDelegate;
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
......@@ -52,7 +53,6 @@ import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
import static org.onlab.onos.net.DefaultAnnotations.merge;
// TODO: synchronization should be done in more fine-grained manner.
/**
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
......@@ -109,8 +109,7 @@ public class SimpleDeviceStore
public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
DeviceDescription deviceDescription) {
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
= createIfAbsentUnchecked(deviceDescs, deviceId,
new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
= getDeviceDescriptions(deviceId);
Device oldDevice = devices.get(deviceId);
......@@ -151,7 +150,7 @@ public class SimpleDeviceStore
// We allow only certain attributes to trigger update
if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
!isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
!AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
synchronized (this) {
devices.replace(newDevice.id(), oldDevice, newDevice);
......@@ -238,7 +237,7 @@ public class SimpleDeviceStore
Port newPort,
ConcurrentMap<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled() ||
!isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
!AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
ports.put(oldPort.number(), newPort);
return new DeviceEvent(PORT_UPDATED, device, newPort);
......@@ -264,11 +263,17 @@ public class SimpleDeviceStore
return events;
}
private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
DeviceId deviceId) {
return createIfAbsentUnchecked(deviceDescs, deviceId,
NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
}
// Gets the map of ports for the specified device; if one does not already
// exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId,
new InitConcurrentHashMap<PortNumber, Port>());
NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
}
@Override
......@@ -323,31 +328,19 @@ public class SimpleDeviceStore
@Override
public DeviceEvent removeDevice(DeviceId deviceId) {
synchronized (this) {
ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
synchronized (descs) {
Device device = devices.remove(deviceId);
return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null);
}
}
private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
if (lhs == rhs) {
return true;
}
if (lhs == null || rhs == null) {
return false;
}
if (!lhs.keys().equals(rhs.keys())) {
return false;
}
for (String key : lhs.keys()) {
if (!lhs.value(key).equals(rhs.value(key))) {
return false;
// should DEVICE_REMOVED carry removed ports?
ConcurrentMap<PortNumber, Port> ports = devicePorts.get(deviceId);
if (ports != null) {
ports.clear();
}
availableDevices.remove(deviceId);
descs.clear();
return device == null ? null :
new DeviceEvent(DEVICE_REMOVED, device, null);
}
return true;
}
/**
......@@ -445,15 +438,6 @@ public class SimpleDeviceStore
return fallBackPrimary;
}
// TODO: can be made generic
private static final class InitConcurrentHashMap<K, V> implements
ConcurrentInitializer<ConcurrentMap<K, V>> {
@Override
public ConcurrentMap<K, V> get() throws ConcurrentException {
return new ConcurrentHashMap<>();
}
}
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
private final DeviceDescription deviceDesc;
......
package org.onlab.onos.store.trivial.impl;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.Provided;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.base.Predicates.notNull;
// TODO: Add support for multiple provider and annotations
/**
* Manages inventory of infrastructure links using trivial in-memory structures
* implementation.
......@@ -46,11 +60,17 @@ public class SimpleLinkStore
private final Logger log = getLogger(getClass());
// Link inventory
private final Map<LinkKey, DefaultLink> links = new ConcurrentHashMap<>();
private final ConcurrentMap<LinkKey,
ConcurrentMap<ProviderId, LinkDescription>>
linkDescs = new ConcurrentHashMap<>();
// Link instance cache
private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
// Egress and ingress link sets
private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
@Activate
public void activate() {
......@@ -59,6 +79,10 @@ public class SimpleLinkStore
@Deactivate
public void deactivate() {
linkDescs.clear();
links.clear();
srcLinks.clear();
dstLinks.clear();
log.info("Stopped");
}
......@@ -69,17 +93,29 @@ public class SimpleLinkStore
@Override
public Iterable<Link> getLinks() {
return Collections.unmodifiableSet(new HashSet<Link>(links.values()));
return Collections.unmodifiableCollection(links.values());
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
return ImmutableSet.copyOf(srcLinks.get(deviceId));
// lock for iteration
synchronized (srcLinks) {
return FluentIterable.from(srcLinks.get(deviceId))
.transform(lookupLink())
.filter(notNull())
.toSet();
}
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
return ImmutableSet.copyOf(dstLinks.get(deviceId));
// lock for iteration
synchronized (dstLinks) {
return FluentIterable.from(dstLinks.get(deviceId))
.transform(lookupLink())
.filter(notNull())
.toSet();
}
}
@Override
......@@ -90,9 +126,9 @@ public class SimpleLinkStore
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egress = new HashSet<>();
for (Link link : srcLinks.get(src.deviceId())) {
if (link.src().equals(src)) {
egress.add(link);
for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
if (linkKey.src().equals(src)) {
egress.add(links.get(linkKey));
}
}
return egress;
......@@ -101,9 +137,9 @@ public class SimpleLinkStore
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingress = new HashSet<>();
for (Link link : dstLinks.get(dst.deviceId())) {
if (link.dst().equals(dst)) {
ingress.add(link);
for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
if (linkKey.dst().equals(dst)) {
ingress.add(links.get(linkKey));
}
}
return ingress;
......@@ -113,56 +149,172 @@ public class SimpleLinkStore
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
DefaultLink link = links.get(key);
if (link == null) {
return createLink(providerId, key, linkDescription);
ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
synchronized (descs) {
final Link oldLink = links.get(key);
// update description
createOrUpdateLinkDescription(descs, providerId, linkDescription);
final Link newLink = composeLink(descs);
if (oldLink == null) {
return createLink(key, newLink);
}
return updateLink(key, oldLink, newLink);
}
return updateLink(providerId, link, key, linkDescription);
}
// Guarded by linkDescs value (=locking each Link)
private LinkDescription createOrUpdateLinkDescription(
ConcurrentMap<ProviderId, LinkDescription> descs,
ProviderId providerId,
LinkDescription linkDescription) {
// merge existing attributes and merge
LinkDescription oldDesc = descs.get(providerId);
LinkDescription newDesc = linkDescription;
if (oldDesc != null) {
SparseAnnotations merged = merge(oldDesc.annotations(),
linkDescription.annotations());
newDesc = new DefaultLinkDescription(
linkDescription.src(),
linkDescription.dst(),
linkDescription.type(), merged);
}
return descs.put(providerId, newDesc);
}
// Creates and stores the link and returns the appropriate event.
private LinkEvent createLink(ProviderId providerId, LinkKey key,
LinkDescription linkDescription) {
DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
linkDescription.type());
synchronized (this) {
links.put(key, link);
srcLinks.put(link.src().deviceId(), link);
dstLinks.put(link.dst().deviceId(), link);
// Guarded by linkDescs value (=locking each Link)
private LinkEvent createLink(LinkKey key, Link newLink) {
if (newLink.providerId().isAncillary()) {
// TODO: revisit ancillary only Link handling
// currently treating ancillary only as down (not visible outside)
return null;
}
return new LinkEvent(LINK_ADDED, link);
links.put(key, newLink);
srcLinks.put(newLink.src().deviceId(), key);
dstLinks.put(newLink.dst().deviceId(), key);
return new LinkEvent(LINK_ADDED, newLink);
}
// Updates, if necessary the specified link and returns the appropriate event.
private LinkEvent updateLink(ProviderId providerId, DefaultLink link,
LinkKey key, LinkDescription linkDescription) {
if (link.type() == INDIRECT && linkDescription.type() == DIRECT) {
synchronized (this) {
srcLinks.remove(link.src().deviceId(), link);
dstLinks.remove(link.dst().deviceId(), link);
DefaultLink updated =
new DefaultLink(providerId, link.src(), link.dst(),
linkDescription.type());
links.put(key, updated);
srcLinks.put(link.src().deviceId(), updated);
dstLinks.put(link.dst().deviceId(), updated);
return new LinkEvent(LINK_UPDATED, updated);
}
// Guarded by linkDescs value (=locking each Link)
private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
if (newLink.providerId().isAncillary()) {
// TODO: revisit ancillary only Link handling
// currently treating ancillary only as down (not visible outside)
return null;
}
if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
!AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
links.put(key, newLink);
// strictly speaking following can be ommitted
srcLinks.put(oldLink.src().deviceId(), key);
dstLinks.put(oldLink.dst().deviceId(), key);
return new LinkEvent(LINK_UPDATED, newLink);
}
return null;
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
synchronized (this) {
Link link = links.remove(new LinkKey(src, dst));
final LinkKey key = new LinkKey(src, dst);
ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
synchronized (descs) {
Link link = links.remove(key);
descs.clear();
if (link != null) {
srcLinks.remove(link.src().deviceId(), link);
dstLinks.remove(link.dst().deviceId(), link);
srcLinks.remove(link.src().deviceId(), key);
dstLinks.remove(link.dst().deviceId(), key);
return new LinkEvent(LINK_REMOVED, link);
}
return null;
}
}
private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
return synchronizedSetMultimap(HashMultimap.<K, V>create());
}
/**
* @return primary ProviderID, or randomly chosen one if none exists
*/
private ProviderId pickPrimaryPID(
ConcurrentMap<ProviderId, LinkDescription> providerDescs) {
ProviderId fallBackPrimary = null;
for (Entry<ProviderId, LinkDescription> e : providerDescs.entrySet()) {
if (!e.getKey().isAncillary()) {
return e.getKey();
} else if (fallBackPrimary == null) {
// pick randomly as a fallback in case there is no primary
fallBackPrimary = e.getKey();
}
}
return fallBackPrimary;
}
private Link composeLink(ConcurrentMap<ProviderId, LinkDescription> descs) {
ProviderId primary = pickPrimaryPID(descs);
LinkDescription base = descs.get(primary);
ConnectPoint src = base.src();
ConnectPoint dst = base.dst();
Type type = base.type();
Annotations annotations = DefaultAnnotations.builder().build();
annotations = merge(annotations, base.annotations());
for (Entry<ProviderId, LinkDescription> e : descs.entrySet()) {
if (primary.equals(e.getKey())) {
continue;
}
// TODO: should keep track of Description timestamp
// and only merge conflicting keys when timestamp is newer
// Currently assuming there will never be a key conflict between
// providers
// annotation merging. not so efficient, should revisit later
annotations = merge(annotations, e.getValue().annotations());
}
return new DefaultLink(primary , src, dst, type, annotations);
}
private ConcurrentMap<ProviderId, LinkDescription> getLinkDescriptions(LinkKey key) {
return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
NewConcurrentHashMap.<ProviderId, LinkDescription>ifNeeded());
}
private final Function<LinkKey, Link> lookupLink = new LookupLink();
private Function<LinkKey, Link> lookupLink() {
return lookupLink;
}
private final class LookupLink implements Function<LinkKey, Link> {
@Override
public Link apply(LinkKey input) {
return links.get(input);
}
}
private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
private static final Predicate<Provided> isPrimary() {
return IS_PRIMARY;
}
private static final class IsPrimary implements Predicate<Provided> {
@Override
public boolean apply(Provided input) {
return !input.providerId().isAncillary();
}
}
}
......
......@@ -103,17 +103,19 @@ public class SimpleDeviceStoreTest {
simpleDeviceStore.deactivate();
}
private void putDevice(DeviceId deviceId, String swVersion) {
private void putDevice(DeviceId deviceId, String swVersion,
SparseAnnotations... annotations) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN);
HW, swVersion, SN, annotations);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
}
private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
private void putDeviceAncillary(DeviceId deviceId, String swVersion,
SparseAnnotations... annotations) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
HW, swVersion, SN);
HW, swVersion, SN, annotations);
deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
}
......@@ -126,6 +128,7 @@ public class SimpleDeviceStoreTest {
assertEquals(SN, device.serialNumber());
}
// TODO slice this out somewhere
/**
* Verifies that Annotations created by merging {@code annotations} is
* equal to actual Annotations.
......@@ -133,7 +136,7 @@ public class SimpleDeviceStoreTest {
* @param actual Annotations to check
* @param annotations
*/
private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
public static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
DefaultAnnotations expected = DefaultAnnotations.builder().build();
for (SparseAnnotations a : annotations) {
expected = DefaultAnnotations.merge(expected, a);
......@@ -347,6 +350,7 @@ public class SimpleDeviceStoreTest {
assertFalse("Port is disabled", event.port().isEnabled());
}
@Test
public final void testUpdatePortStatusAncillary() {
putDeviceAncillary(DID1, SW1);
......@@ -435,16 +439,37 @@ public class SimpleDeviceStoreTest {
@Test
public final void testRemoveDevice() {
putDevice(DID1, SW1);
putDevice(DID1, SW1, A1);
List<PortDescription> pds = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true, A2)
);
deviceStore.updatePorts(PID, DID1, pds);
putDevice(DID2, SW1);
assertEquals(2, deviceStore.getDeviceCount());
assertEquals(1, deviceStore.getPorts(DID1).size());
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(1, deviceStore.getDeviceCount());
assertEquals(0, deviceStore.getPorts(DID1).size());
// putBack Device, Port w/o annotation
putDevice(DID1, SW1);
List<PortDescription> pds2 = Arrays.<PortDescription>asList(
new DefaultPortDescription(P1, true)
);
deviceStore.updatePorts(PID, DID1, pds2);
// annotations should not survive
assertEquals(2, deviceStore.getDeviceCount());
assertEquals(1, deviceStore.getPorts(DID1).size());
assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
}
// If Delegates should be called only on remote events,
......
......@@ -4,7 +4,9 @@ import static org.junit.Assert.*;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.Link.Type.*;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.onlab.onos.store.trivial.impl.SimpleDeviceStoreTest.assertAnnotationsEquals;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
......@@ -18,10 +20,12 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkEvent;
......@@ -37,6 +41,7 @@ import com.google.common.collect.Iterables;
public class SimpleLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
......@@ -44,6 +49,23 @@ public class SimpleLinkStoreTest {
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
private static final SparseAnnotations A1 = DefaultAnnotations.builder()
.set("A1", "a1")
.set("B1", "b1")
.build();
private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
.remove("A1")
.set("B3", "b3")
.build();
private static final SparseAnnotations A2 = DefaultAnnotations.builder()
.set("A2", "a2")
.set("B2", "b2")
.build();
private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
.remove("A2")
.set("B4", "b4")
.build();
private SimpleLinkStore simpleLinkStore;
private LinkStore linkStore;
......@@ -69,16 +91,17 @@ public class SimpleLinkStoreTest {
}
private void putLink(DeviceId srcId, PortNumber srcNum,
DeviceId dstId, PortNumber dstNum, Type type) {
DeviceId dstId, PortNumber dstNum, Type type,
SparseAnnotations... annotations) {
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type));
linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
}
private void putLink(LinkKey key, Type type) {
private void putLink(LinkKey key, Type type, SparseAnnotations... annotations) {
putLink(key.src().deviceId(), key.src().port(),
key.dst().deviceId(), key.dst().port(),
type);
type, annotations);
}
private static void assertLink(DeviceId srcId, PortNumber srcNum,
......@@ -270,14 +293,67 @@ public class SimpleLinkStoreTest {
}
@Test
public final void testCreateOrUpdateLinkAncillary() {
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
// add Ancillary link
LinkEvent event = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, INDIRECT, A1));
assertNull("Ancillary only link is ignored", event);
// add Primary link
LinkEvent event2 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, INDIRECT, A2));
assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
assertEquals(LINK_ADDED, event2.type());
// update link type
LinkEvent event3 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2));
assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
assertEquals(LINK_UPDATED, event3.type());
// no change
LinkEvent event4 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT));
assertNull("No change event expected", event4);
// update link annotation (Primary)
LinkEvent event5 = linkStore.createOrUpdateLink(PID,
new DefaultLinkDescription(src, dst, DIRECT, A2_2));
assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
assertEquals(LINK_UPDATED, event5.type());
// update link annotation (Ancillary)
LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, DIRECT, A1_2));
assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
assertEquals(LINK_UPDATED, event6.type());
// update link type (Ancillary) : ignored
LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, EDGE));
assertNull("Ancillary change other than annotation is ignored", event7);
}
@Test
public final void testRemoveLink() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
putLink(linkId1, DIRECT);
putLink(linkId2, DIRECT);
putLink(linkId1, DIRECT, A1);
putLink(linkId2, DIRECT, A2);
// DID1,P1 => DID2,P2
// DID2,P2 => DID1,P1
......@@ -285,10 +361,41 @@ public class SimpleLinkStoreTest {
LinkEvent event = linkStore.removeLink(d1P1, d2P2);
assertEquals(LINK_REMOVED, event.type());
assertAnnotationsEquals(event.subject().annotations(), A1);
LinkEvent event2 = linkStore.removeLink(d1P1, d2P2);
assertNull(event2);
assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
assertAnnotationsEquals(linkStore.getLink(d2P2, d1P1).annotations(), A2);
// annotations, etc. should not survive remove
putLink(linkId1, DIRECT);
assertLink(linkId1, DIRECT, linkStore.getLink(d1P1, d2P2));
assertAnnotationsEquals(linkStore.getLink(d1P1, d2P2).annotations());
}
@Test
public final void testAncillaryOnlyNotVisible() {
ConnectPoint src = new ConnectPoint(DID1, P1);
ConnectPoint dst = new ConnectPoint(DID2, P2);
// add Ancillary link
linkStore.createOrUpdateLink(PIDA,
new DefaultLinkDescription(src, dst, INDIRECT, A1));
// Ancillary only link should not be visible
assertEquals(0, linkStore.getLinkCount());
assertTrue(Iterables.isEmpty(linkStore.getLinks()));
assertNull(linkStore.getLink(src, dst));
assertEquals(Collections.emptySet(), linkStore.getIngressLinks(dst));
assertEquals(Collections.emptySet(), linkStore.getEgressLinks(src));
assertEquals(Collections.emptySet(), linkStore.getDeviceEgressLinks(DID1));
assertEquals(Collections.emptySet(), linkStore.getDeviceIngressLinks(DID2));
}
// If Delegates should be called only on remote events,
......
......@@ -167,11 +167,11 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
// TODO We could check for the optional bitmap, but for now
// we are just checking the version number.
if (m.getVersion() == OFVersion.OF_13) {
log.info("Received {} Hello from {}", m.getVersion(),
log.debug("Received {} Hello from {}", m.getVersion(),
h.channel.getRemoteAddress());
h.ofVersion = OFVersion.OF_13;
} else if (m.getVersion() == OFVersion.OF_10) {
log.info("Received {} Hello from {} - switching to OF "
log.debug("Received {} Hello from {} - switching to OF "
+ "version 1.0", m.getVersion(),
h.channel.getRemoteAddress());
h.ofVersion = OFVersion.OF_10;
......@@ -222,7 +222,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
void processOFFeaturesReply(OFChannelHandler h, OFFeaturesReply m)
throws IOException {
h.thisdpid = m.getDatapathId().getLong();
log.info("Received features reply for switch at {} with dpid {}",
log.debug("Received features reply for switch at {} with dpid {}",
h.getSwitchInfoString(), h.thisdpid);
h.featuresReply = m; //temp store
......@@ -409,7 +409,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
log.info("Switch {} bound to class {}, description {}",
log.debug("Switch {} bound to class {}, description {}",
new Object[] {h.sw, h.sw.getClass(), drep });
//Put switch in EQUAL mode until we hear back from the global registry
//log.debug("Setting new switch {} to EQUAL and sending Role request",
......@@ -651,7 +651,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
* @param error The error message
*/
protected void logError(OFChannelHandler h, OFErrorMsg error) {
log.info("{} from switch {} in state {}",
log.error("{} from switch {} in state {}",
new Object[] {
error,
h.getSwitchInfoString(),
......@@ -1050,7 +1050,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
throws Exception {
OFFactory factory = (ofVersion == OFVersion.OF_13) ? factory13 : factory10;
OFMessage m = factory.buildEchoRequest().build();
log.info("Sending Echo Request on idle channel: {}",
log.debug("Sending Echo Request on idle channel: {}",
e.getChannel().getPipeline().getLast().toString());
e.getChannel().write(Collections.singletonList(m));
// XXX S some problems here -- echo request has no transaction id, and
......
......@@ -22,6 +22,12 @@ import java.util.Arrays;
*
*/
public class MacAddress {
public static final byte[] ZERO_MAC_ADDRESS =
MacAddress.valueOf("00:00:00:00:00:00").getAddress();
public static final byte[] BROADCAST_MAC =
MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress();
public static final int MAC_ADDRESS_LENGTH = 6;
private byte[] address = new byte[MacAddress.MAC_ADDRESS_LENGTH];
......
package org.onlab.util;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
/**
* Creates an instance of new ConcurrentHashMap on each {@link #get()} call.
* <p>
* To be used with
* {@link org.apache.commons.lang3.concurrent.ConcurrentUtils#createIfAbsent()
* ConcurrentUtils#createIfAbsent}
*
* @param <K> ConcurrentHashMap key type
* @param <V> ConcurrentHashMap value type
*/
public final class NewConcurrentHashMap<K, V>
implements ConcurrentInitializer<ConcurrentMap<K, V>> {
public static final NewConcurrentHashMap<?, ?> INSTANCE = new NewConcurrentHashMap<>();
@SuppressWarnings("unchecked")
public static <K, V> NewConcurrentHashMap<K, V> ifNeeded() {
return (NewConcurrentHashMap<K, V>) INSTANCE;
}
@Override
public ConcurrentMap<K, V> get() throws ConcurrentException {
return new ConcurrentHashMap<>();
}
}
......@@ -9,7 +9,7 @@ public class EchoHandler implements MessageHandler {
@Override
public void handle(Message message) throws IOException {
System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
System.out.println("Received message. Echoing it back to the sender.");
message.respond(message.payload());
}
}
......
......@@ -8,6 +8,14 @@ public class Endpoint {
private final int port;
private final String host;
/**
* Used for serialization.
*/
private Endpoint() {
port = 0;
host = null;
}
public Endpoint(String host, int port) {
this.host = host;
this.port = port;
......
......@@ -35,6 +35,10 @@ public final class InternalMessage implements Message {
return payload;
}
protected void setMessagingService(NettyMessagingService messagingService) {
this.messagingService = messagingService;
}
@Override
public void respond(Object data) throws IOException {
Builder builder = new Builder(messagingService);
......
package org.onlab.netty;
import org.onlab.util.KryoPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
......@@ -12,8 +11,6 @@ import java.util.HashMap;
*/
public class KryoSerializer implements Serializer {
private final Logger log = LoggerFactory.getLogger(getClass());
private KryoPool serializerPool;
public KryoSerializer() {
......@@ -28,7 +25,9 @@ public class KryoSerializer implements Serializer {
serializerPool = KryoPool.newBuilder()
.register(ArrayList.class,
HashMap.class,
ArrayList.class
ArrayList.class,
InternalMessage.class,
Endpoint.class
)
.build()
.populate(1);
......@@ -36,7 +35,7 @@ public class KryoSerializer implements Serializer {
@Override
public Object decode(byte[] data) {
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
......@@ -44,4 +43,14 @@ public class KryoSerializer implements Serializer {
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
@Override
public <T> T deserialize(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
@Override
public void serialize(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
}
......
package org.onlab.netty;
import java.util.Arrays;
import java.util.List;
import static com.google.common.base.Preconditions.checkState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.Arrays;
import java.util.List;
/**
* Decode bytes into a InternalMessage.
*/
public class MessageDecoder extends ByteToMessageDecoder {
// TODO: Implement performance enchancements such as those described in the javadoc for ReplayingDecoder.
public class MessageDecoder extends ReplayingDecoder<InternalMessage> {
private final NettyMessagingService messagingService;
private final Serializer serializer;
......@@ -23,36 +20,21 @@ public class MessageDecoder extends ByteToMessageDecoder {
}
@Override
protected void decode(ChannelHandlerContext context, ByteBuf in,
List<Object> messages) throws Exception {
protected void decode(
ChannelHandlerContext context,
ByteBuf buffer,
List<Object> out) throws Exception {
byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
buffer.readBytes(preamble);
checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
// read message Id.
long id = in.readLong();
// read message type; first read size and then bytes.
String type = new String(in.readBytes(in.readInt()).array());
// read sender host name; first read size and then bytes.
String host = new String(in.readBytes(in.readInt()).array());
// read sender port.
int port = in.readInt();
Endpoint sender = new Endpoint(host, port);
// read message payload; first read size and then bytes.
Object payload = serializer.decode(in.readBytes(in.readInt()).array());
InternalMessage message = new InternalMessage.Builder(messagingService)
.withId(id)
.withSender(sender)
.withType(type)
.withPayload(payload)
.build();
int bodySize = buffer.readInt();
byte[] body = new byte[bodySize];
buffer.readBytes(body);
messages.add(message);
InternalMessage message = serializer.decode(body);
message.setMessagingService(messagingService);
out.add(message);
}
}
......
......@@ -19,42 +19,20 @@ public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
}
@Override
protected void encode(ChannelHandlerContext context, InternalMessage message,
protected void encode(
ChannelHandlerContext context,
InternalMessage message,
ByteBuf out) throws Exception {
// write preamble
out.writeBytes(PREAMBLE);
// write id
out.writeLong(message.id());
byte[] payload = serializer.encode(message);
// write type length
out.writeInt(message.type().length());
// write type
out.writeBytes(message.type().getBytes());
// write sender host name size
out.writeInt(message.sender().host().length());
// write sender host name.
out.writeBytes(message.sender().host().getBytes());
// write port
out.writeInt(message.sender().port());
try {
serializer.encode(message.payload());
} catch (Exception e) {
e.printStackTrace();
}
byte[] payload = serializer.encode(message.payload());
// write payload length.
// write payload length
out.writeInt(payload.length);
// write payload bytes
// write payload.
out.writeBytes(payload);
}
}
......
......@@ -22,7 +22,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
......@@ -38,8 +37,8 @@ public class NettyMessagingService implements MessagingService {
private final Logger log = LoggerFactory.getLogger(getClass());
private KeyedObjectPool<Endpoint, Channel> channels =
new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
private GenericKeyedObjectPool<Endpoint, Channel> channels;
private final int port;
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
......@@ -66,6 +65,9 @@ public class NettyMessagingService implements MessagingService {
}
public void activate() throws Exception {
channels = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
channels.setTestOnBorrow(true);
channels.setTestOnReturn(true);
responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
......@@ -95,17 +97,14 @@ public class NettyMessagingService implements MessagingService {
protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
Channel channel = null;
try {
channel = channels.borrowObject(ep);
channel.eventLoop().execute(new WriteTask(channel, message));
} catch (Exception e) {
throw new IOException(e);
} finally {
try {
channel = channels.borrowObject(ep);
channel.eventLoop().execute(new WriteTask(channel, message));
} finally {
channels.returnObject(ep, channel);
} catch (Exception e) {
log.warn("Error returning object back to the pool", e);
// ignored.
}
} catch (Exception e) {
throw new IOException(e);
}
}
......@@ -141,6 +140,8 @@ public class NettyMessagingService implements MessagingService {
private void startAcceptingConnections() throws InterruptedException {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
......@@ -169,6 +170,8 @@ public class NettyMessagingService implements MessagingService {
public Channel makeObject(Endpoint ep) throws Exception {
Bootstrap b = new Bootstrap();
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
b.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024);
b.group(workerGroup);
// TODO: Make this faster:
// http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
......@@ -197,20 +200,20 @@ public class NettyMessagingService implements MessagingService {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new MessageEncoder(serializer))
.addLast(new MessageDecoder(NettyMessagingService.this, serializer))
.addLast(new NettyMessagingService.InboundMessageDispatcher());
.addLast("encoder", new MessageEncoder(serializer))
.addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
.addLast("handler", new InboundMessageDispatcher());
}
}
private class WriteTask implements Runnable {
private final Object message;
private final InternalMessage message;
private final Channel channel;
public WriteTask(Channel channel, Object message) {
this.message = message;
public WriteTask(Channel channel, InternalMessage message) {
this.channel = channel;
this.message = message;
}
@Override
......@@ -240,5 +243,11 @@ public class NettyMessagingService implements MessagingService {
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
handler.handle(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
context.close();
}
}
}
......
package org.onlab.netty;
import java.nio.ByteBuffer;
/**
* Interface for encoding/decoding message payloads.
*/
......@@ -11,7 +13,7 @@ public interface Serializer {
* @param data byte array.
* @return POJO
*/
Object decode(byte[] data);
public <T> T decode(byte[] data);
/**
* Encodes the specified POJO into a byte array.
......@@ -19,6 +21,23 @@ public interface Serializer {
* @param data POJO to be encoded
* @return byte array.
*/
byte[] encode(Object message);
public byte[] encode(Object data);
/**
* Serializes the specified object into bytes using one of the
* pre-registered serializers.
*
* @param obj object to be serialized
* @param buffer to write serialized bytes
*/
public void serialize(final Object obj, ByteBuffer buffer);
/**
* Deserializes the specified bytes into an object using one of the
* pre-registered serializers.
*
* @param buffer bytes to be deserialized
* @return deserialized object
*/
public <T> T deserialize(final ByteBuffer buffer);
}
......