Thomas Vachuska
Committed by Gerrit Code Review

Adding ability to synchronize topology clusters' broadcast trees.

Proxy ARP now supports deferred ARP replies until instance learns of the subject host location.

Change-Id: Ib3ee97c0812858b5b4972d945e9e6d2bd397d4c5
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.proxyarp;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Host;
import java.nio.ByteBuffer;
/**
* State distribution mechanism for the proxy ARP service.
*/
public interface ProxyArpStore {
/**
* Forwards an ARP or neighbor solicitation request to its destination.
* Floods at the edg the request if the destination is not known.
*
* @param outPort the port the request was received on
* @param subject subject host
* @param packet an ethernet frame containing an ARP or neighbor
* solicitation request
*/
void forward(ConnectPoint outPort, Host subject, ByteBuffer packet);
/**
* Associates the specified delegate with the store.
*
* @param delegate store delegate
*/
void setDelegate(ProxyArpStoreDelegate delegate);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.proxyarp;
import org.onosproject.net.ConnectPoint;
import java.nio.ByteBuffer;
/**
* Proxy ARP store delegate.
*/
public interface ProxyArpStoreDelegate {
/**
* Emits ARP or neighbour discovery response packet.
*
* @param outPort output connection point
* @param packet packet to emit
*/
void emitResponse(ConnectPoint outPort, ByteBuffer packet);
}
......@@ -15,6 +15,7 @@
*/
package org.onosproject.common;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
......@@ -77,17 +78,20 @@ public class DefaultTopology extends AbstractModel implements Topology {
private final Supplier<ImmutableMap<ClusterId, TopologyCluster>> clusters;
private final Supplier<ImmutableSet<ConnectPoint>> infrastructurePoints;
private final Supplier<ImmutableSetMultimap<ClusterId, ConnectPoint>> broadcastSets;
private final Function<ConnectPoint, Boolean> broadcastFunction;
private final Supplier<ClusterIndexes> clusterIndexes;
/**
* Creates a topology descriptor attributed to the specified provider.
*
* @param providerId identity of the provider
* @param description data describing the new topology
* @param providerId identity of the provider
* @param description data describing the new topology
* @param broadcastFunction broadcast point function
*/
public DefaultTopology(ProviderId providerId, GraphDescription description) {
public DefaultTopology(ProviderId providerId, GraphDescription description,
Function<ConnectPoint, Boolean> broadcastFunction) {
super(providerId);
this.broadcastFunction = broadcastFunction;
this.time = description.timestamp();
this.creationTime = description.creationTime();
......@@ -106,6 +110,16 @@ public class DefaultTopology extends AbstractModel implements Topology {
this.computeCost = Math.max(0, System.nanoTime() - time);
}
/**
* Creates a topology descriptor attributed to the specified provider.
*
* @param providerId identity of the provider
* @param description data describing the new topology
*/
public DefaultTopology(ProviderId providerId, GraphDescription description) {
this(providerId, description, null);
}
@Override
public long time() {
return time;
......@@ -223,6 +237,10 @@ public class DefaultTopology extends AbstractModel implements Topology {
* @return true if in broadcast set
*/
public boolean isBroadcastPoint(ConnectPoint connectPoint) {
if (broadcastFunction != null) {
return broadcastFunction.apply(connectPoint);
}
// Any non-infrastructure, i.e. edge points are assumed to be OK.
if (!isInfrastructure(connectPoint)) {
return true;
......
......@@ -36,7 +36,6 @@ import org.onlab.packet.ndp.NeighborSolicitation;
import org.onosproject.core.Permission;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Host;
import org.onosproject.net.HostId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.edge.EdgePortService;
import org.onosproject.net.flow.DefaultTrafficTreatment;
......@@ -50,6 +49,7 @@ import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.proxyarp.ProxyArpService;
import org.onosproject.net.proxyarp.ProxyArpStore;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
......@@ -59,6 +59,8 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.packet.VlanId.vlanId;
import static org.onosproject.net.HostId.hostId;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -90,25 +92,29 @@ public class ProxyArpManager implements ProxyArpService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ProxyArpStore store;
/**
* Listens to both device service and link service to determine
* whether a port is internal or external.
*/
@Activate
public void activate() {
store.setDelegate(this::sendTo);
log.info("Started");
}
@Deactivate
public void deactivate() {
store.setDelegate(null);
log.info("Stopped");
}
@Override
public boolean isKnown(IpAddress addr) {
checkPermission(Permission.PACKET_READ);
checkNotNull(addr, MAC_ADDR_NULL);
Set<Host> hosts = hostService.getHostsByIp(addr);
return !hosts.isEmpty();
......@@ -117,7 +123,6 @@ public class ProxyArpManager implements ProxyArpService {
@Override
public void reply(Ethernet eth, ConnectPoint inPort) {
checkPermission(Permission.PACKET_WRITE);
checkNotNull(eth, REQUEST_NULL);
if (eth.getEtherType() == Ethernet.TYPE_ARP) {
......@@ -133,7 +138,7 @@ public class ProxyArpManager implements ProxyArpService {
checkNotNull(inPort);
Ip4Address targetAddress = Ip4Address.valueOf(arp.getTargetProtocolAddress());
VlanId vlan = VlanId.vlanId(eth.getVlanID());
VlanId vlan = vlanId(eth.getVlanID());
if (isOutsidePort(inPort)) {
// If the request came from outside the network, only reply if it was
......@@ -158,8 +163,8 @@ public class ProxyArpManager implements ProxyArpService {
Set<Host> hosts = hostService.getHostsByIp(targetAddress);
Host dst = null;
Host src = hostService.getHost(HostId.hostId(eth.getSourceMAC(),
VlanId.vlanId(eth.getVlanID())));
Host src = hostService.getHost(hostId(eth.getSourceMAC(),
vlanId(eth.getVlanID())));
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
......@@ -202,17 +207,15 @@ public class ProxyArpManager implements ProxyArpService {
// Flood the request on all ports except the incoming port.
//
flood(eth, inPort);
return;
}
private void replyNdp(Ethernet eth, ConnectPoint inPort) {
IPv6 ipv6 = (IPv6) eth.getPayload();
ICMP6 icmpv6 = (ICMP6) ipv6.getPayload();
NeighborSolicitation nsol = (NeighborSolicitation) icmpv6.getPayload();
Ip6Address targetAddress = Ip6Address.valueOf(nsol.getTargetAddress());
VlanId vlan = VlanId.vlanId(eth.getVlanID());
VlanId vlan = vlanId(eth.getVlanID());
// If the request came from outside the network, only reply if it was
// for one of our external addresses.
......@@ -259,8 +262,8 @@ public class ProxyArpManager implements ProxyArpService {
Set<Host> hosts = hostService.getHostsByIp(targetAddress);
Host dst = null;
Host src = hostService.getHost(HostId.hostId(eth.getSourceMAC(),
VlanId.vlanId(eth.getVlanID())));
Host src = hostService.getHost(hostId(eth.getSourceMAC(),
vlanId(eth.getVlanID())));
for (Host host : hosts) {
if (host.vlan().equals(vlan)) {
......@@ -293,6 +296,10 @@ public class ProxyArpManager implements ProxyArpService {
* @param outPort the port to send it out
*/
private void sendTo(Ethernet packet, ConnectPoint outPort) {
sendTo(outPort, ByteBuffer.wrap(packet.serialize()));
}
private void sendTo(ConnectPoint outPort, ByteBuffer packet) {
if (!edgeService.isEdgePoint(outPort)) {
// Sanity check to make sure we don't send the packet out an
// internal port and create a loop (could happen due to
......@@ -303,7 +310,7 @@ public class ProxyArpManager implements ProxyArpService {
TrafficTreatment.Builder builder = DefaultTrafficTreatment.builder();
builder.setOutput(outPort.port());
packetService.emit(new DefaultOutboundPacket(outPort.deviceId(),
builder.build(), ByteBuffer.wrap(packet.serialize())));
builder.build(), packet));
}
/**
......@@ -329,31 +336,25 @@ public class ProxyArpManager implements ProxyArpService {
* @return true if the port is an outside-facing port, otherwise false
*/
private boolean isOutsidePort(ConnectPoint port) {
//
// TODO: Is this sufficient to identify outside-facing ports: just
// having IP addresses on a port?
//
// TODO: Is this sufficient to identify outside-facing ports: just having IP addresses on a port?
return !hostService.getAddressBindingsForPort(port).isEmpty();
}
@Override
public void forward(Ethernet eth, ConnectPoint inPort) {
checkPermission(Permission.PACKET_WRITE);
checkNotNull(eth, REQUEST_NULL);
Host h = hostService.getHost(HostId.hostId(eth.getDestinationMAC(),
VlanId.vlanId(eth.getVlanID())));
Host h = hostService.getHost(hostId(eth.getDestinationMAC(),
vlanId(eth.getVlanID())));
if (h == null) {
flood(eth, inPort);
} else {
TrafficTreatment.Builder builder = DefaultTrafficTreatment.builder();
builder.setOutput(h.location().port());
packetService.emit(new DefaultOutboundPacket(h.location().deviceId(),
builder.build(), ByteBuffer.wrap(eth.serialize())));
Host subject = hostService.getHost(hostId(eth.getSourceMAC(),
vlanId(eth.getVlanID())));
store.forward(h.location(), subject, ByteBuffer.wrap(eth.serialize()));
}
}
@Override
......@@ -424,7 +425,7 @@ public class ProxyArpManager implements ProxyArpService {
builder = DefaultTrafficTreatment.builder();
builder.setOutput(connectPoint.port());
packetService.emit(new DefaultOutboundPacket(connectPoint.deviceId(),
builder.build(), buf));
builder.build(), buf));
}
}
......@@ -439,7 +440,6 @@ public class ProxyArpManager implements ProxyArpService {
*/
private Ethernet buildNdpReply(Ip6Address srcIp, MacAddress srcMac,
Ethernet request) {
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(request.getSourceMAC());
eth.setSourceMACAddress(srcMac);
......@@ -461,7 +461,7 @@ public class ProxyArpManager implements ProxyArpService {
nadv.setSolicitedFlag((byte) 1);
nadv.setOverrideFlag((byte) 1);
nadv.addOption(NeighborDiscoveryOptions.TYPE_TARGET_LL_ADDRESS,
srcMac.toBytes());
srcMac.toBytes());
icmp6.setPayload(nadv);
ipv6.setPayload(icmp6);
......
......@@ -38,6 +38,8 @@ import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.edgeservice.impl.EdgeManager;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
import org.onosproject.net.host.HostService;
......@@ -45,24 +47,22 @@ import org.onosproject.net.host.InterfaceIpAddress;
import org.onosproject.net.host.PortAddresses;
import org.onosproject.net.link.LinkListener;
import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketServiceAdapter;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.net.proxyarp.ProxyArpStore;
import org.onosproject.net.proxyarp.ProxyArpStoreDelegate;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
/**
* Tests for the {@link ProxyArpManager} class.
......@@ -110,6 +110,7 @@ public class ProxyArpManagerTest {
proxyArp = new ProxyArpManager();
packetService = new TestPacketService();
proxyArp.packetService = packetService;
proxyArp.store = new TestProxyArpStoreAdapter();
proxyArp.edgeService = new TestEdgePortService();
......@@ -455,8 +456,11 @@ public class ProxyArpManagerTest {
public void testForwardToHost() {
Host host1 = new DefaultHost(PID, HID1, MAC1, VLAN1, LOC1,
Collections.singleton(IP1));
Host host2 = new DefaultHost(PID, HID2, MAC2, VLAN1, LOC2,
Collections.singleton(IP2));
expect(hostService.getHost(HID1)).andReturn(host1);
expect(hostService.getHost(HID2)).andReturn(host2);
replay(hostService);
Ethernet arpRequest = buildArp(ARP.OP_REPLY, MAC2, MAC1, IP2, IP1);
......@@ -625,4 +629,16 @@ public class ProxyArpManagerTest {
return getEdgePointsNoArg;
}
}
private class TestProxyArpStoreAdapter implements ProxyArpStore {
@Override
public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
TrafficTreatment tt = DefaultTrafficTreatment.builder().setOutput(outPort.port()).build();
packetService.emit(new DefaultOutboundPacket(outPort.deviceId(), tt, packet));
}
@Override
public void setDelegate(ProxyArpStoreDelegate delegate) {
}
}
}
......
......@@ -4,6 +4,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.DefaultAnnotations.merge;
import static org.onosproject.net.host.HostEvent.Type.HOST_ADDED;
import static org.onosproject.net.host.HostEvent.Type.HOST_REMOVED;
import static org.onosproject.net.host.HostEvent.Type.HOST_UPDATED;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -246,14 +247,17 @@ public class ECHostStore
}
private class HostLocationTracker implements EventuallyConsistentMapListener<HostId, DefaultHost> {
@Override
public void event(EventuallyConsistentMapEvent<HostId, DefaultHost> event) {
DefaultHost host = checkNotNull(event.value());
if (event.type() == PUT) {
locations.put(host.location(), host);
boolean isNew = locations.put(host.location(), host);
notifyDelegate(new HostEvent(isNew ? HOST_ADDED : HOST_UPDATED, host));
} else if (event.type() == REMOVE) {
locations.remove(host.location(), host);
if (locations.remove(host.location(), host)) {
notifyDelegate(new HostEvent(HOST_REMOVED, host));
}
}
}
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.proxyarp.impl;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Host;
import org.onosproject.net.HostId;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.proxyarp.ProxyArpStore;
import org.onosproject.net.proxyarp.ProxyArpStoreDelegate;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
import static org.onlab.util.Tools.groupedThreads;
/**
* Implementation of proxy ARP distribution mechanism.
*/
@Component(immediate = true)
@Service
public class DistributedProxyArpStore implements ProxyArpStore {
private Logger log = LoggerFactory.getLogger(getClass());
private static final MessageSubject ARP_RESPONSE_MESSAGE =
new MessageSubject("onos-arp-response");
protected final KryoSerializer serializer = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(ArpResponseMessage.class)
.register(ByteBuffer.class)
.build();
}
};
private ProxyArpStoreDelegate delegate;
private Map<HostId, ArpResponseMessage> pendingMessages = Maps.newConcurrentMap();
private ExecutorService executor =
newFixedThreadPool(4, groupedThreads("onos/arp", "sender-%d"));
private NodeId localNodeId;
private HostListener hostListener = new InternalHostListener();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService commService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
@Activate
protected void activate() {
localNodeId = clusterService.getLocalNode().id();
hostService.addListener(hostListener);
commService.addSubscriber(ARP_RESPONSE_MESSAGE, serializer::decode,
this::processArpResponse, executor);
log.info("Started");
}
@Deactivate
protected void deactivate() {
commService.removeSubscriber(ARP_RESPONSE_MESSAGE);
hostService.removeListener(hostListener);
log.info("Stopped");
}
@Override
public void forward(ConnectPoint outPort, Host subject, ByteBuffer packet) {
NodeId nodeId = mastershipService.getMasterFor(outPort.deviceId());
if (nodeId.equals(localNodeId)) {
if (delegate != null) {
delegate.emitResponse(outPort, packet);
}
} else {
log.info("Forwarding ARP response from {} to {}", subject.id(), outPort);
commService.unicast(new ArpResponseMessage(outPort, subject, packet.array()),
ARP_RESPONSE_MESSAGE, serializer::encode, nodeId);
}
}
@Override
public void setDelegate(ProxyArpStoreDelegate delegate) {
this.delegate = delegate;
}
// Processes the incoming ARP response message.
private void processArpResponse(ArpResponseMessage msg) {
pendingMessages.put(msg.subject.id(), msg);
if (hostService.getHost(msg.subject.id()) != null) {
checkPendingArps(msg.subject.id());
}
// FIXME: figure out pruning so stuff does not build up
}
// Checks for pending ARP response message for the specified host.
// If one exists, emit response via delegate.
private void checkPendingArps(HostId id) {
ArpResponseMessage msg = pendingMessages.remove(id);
if (msg != null && delegate != null) {
log.info("Emitting ARP response from {} to {}", id, msg.outPort);
delegate.emitResponse(msg.outPort, ByteBuffer.wrap(msg.packet));
}
}
// Message carrying an ARP response.
private static class ArpResponseMessage {
private ConnectPoint outPort;
private Host subject;
private byte[] packet;
public ArpResponseMessage(ConnectPoint outPort, Host subject, byte[] packet) {
this.outPort = outPort;
this.subject = subject;
this.packet = packet;
}
private ArpResponseMessage() {
}
}
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
checkPendingArps(event.subject().id());
}
}
}
......@@ -16,19 +16,25 @@
package org.onosproject.store.topology.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.isNullOrEmpty;
import static org.onosproject.net.topology.TopologyEvent.Type.TOPOLOGY_CHANGED;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.common.DefaultTopology;
import org.onosproject.event.Event;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
......@@ -46,6 +52,12 @@ import org.onosproject.net.topology.TopologyGraph;
import org.onosproject.net.topology.TopologyStore;
import org.onosproject.net.topology.TopologyStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
/**
......@@ -69,13 +81,41 @@ public class DistributedTopologyStore
Collections.<Device>emptyList(),
Collections.<Link>emptyList()));
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
// Cluster root to broadcast points bindings to allow convergence to
// a shared broadcast tree; node that is the master of the cluster root
// is the primary.
private EventuallyConsistentMap<DeviceId, Set<ConnectPoint>> broadcastPoints;
private EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> listener =
new InternalBroadcastPointListener();
@Activate
public void activate() {
KryoNamespace.Builder hostSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API);
broadcastPoints = storageService.<DeviceId, Set<ConnectPoint>>eventuallyConsistentMapBuilder()
.withName("onos-broadcast-trees")
.withSerializer(hostSerializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
broadcastPoints.addListener(listener);
log.info("Started");
}
@Deactivate
public void deactivate() {
broadcastPoints.removeListener(listener);
broadcastPoints.destroy();
log.info("Stopped");
}
......@@ -136,6 +176,22 @@ public class DistributedTopologyStore
return defaultTopology(topology).isBroadcastPoint(connectPoint);
}
private boolean isBroadcastPoint(ConnectPoint connectPoint) {
// Any non-infrastructure, i.e. edge points are assumed to be OK.
if (!current.isInfrastructure(connectPoint)) {
return true;
}
// Find the cluster to which the device belongs.
TopologyCluster cluster = current.getCluster(connectPoint.deviceId());
checkArgument(cluster != null, "No cluster found for device %s", connectPoint.deviceId());
// If the broadcast set is null or empty, or if the point explicitly
// belongs to it, return true;
Set<ConnectPoint> points = broadcastPoints.get(cluster.root().deviceId());
return isNullOrEmpty(points) || points.contains(connectPoint);
}
@Override
public TopologyEvent updateTopology(ProviderId providerId,
GraphDescription graphDescription,
......@@ -147,7 +203,9 @@ public class DistributedTopologyStore
}
// Have the default topology construct self from the description data.
DefaultTopology newTopology = new DefaultTopology(providerId, graphDescription);
DefaultTopology newTopology =
new DefaultTopology(providerId, graphDescription, this::isBroadcastPoint);
updateBroadcastPoints(newTopology);
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
......@@ -156,6 +214,24 @@ public class DistributedTopologyStore
}
}
private void updateBroadcastPoints(DefaultTopology topology) {
// Remove any broadcast trees rooted by devices for which we are master.
Set<DeviceId> toRemove = broadcastPoints.keySet().stream()
.filter(mastershipService::isLocalMaster)
.collect(Collectors.toSet());
// Update the broadcast trees rooted by devices for which we are master.
topology.getClusters().forEach(c -> {
toRemove.remove(c.root().deviceId());
if (mastershipService.isLocalMaster(c.root().deviceId())) {
broadcastPoints.put(c.root().deviceId(),
topology.broadcastPoints(c.id()));
}
});
toRemove.forEach(broadcastPoints::remove);
}
// Validates the specified topology and returns it as a default
private DefaultTopology defaultTopology(Topology topology) {
checkArgument(topology instanceof DefaultTopology,
......@@ -163,4 +239,16 @@ public class DistributedTopologyStore
return (DefaultTopology) topology;
}
private class InternalBroadcastPointListener
implements EventuallyConsistentMapListener<DeviceId, Set<ConnectPoint>> {
@Override
public void event(EventuallyConsistentMapEvent<DeviceId, Set<ConnectPoint>> event) {
if (event.type() == EventuallyConsistentMapEvent.Type.PUT) {
if (!event.value().isEmpty()) {
log.info("Cluster rooted at {} has {} broadcast-points; #{}",
event.key(), event.value().size(), event.value().hashCode());
}
}
}
}
}
......