alshabib
Committed by Gerrit Code Review

[Falcon] Refactored mcast store implementation.

Change-Id: Ie3fbc675d02c5abe5f5a419d2fc12dbe8fb4ec35

refactored mcast store implementation

Change-Id: I67d70d678813184c522c78e0771f6b8f8f9c25f8
Showing 17 changed files with 585 additions and 218 deletions
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.igmp.impl;
package org.onosproject.igmp;
import org.onlab.packet.IGMP;
import org.onosproject.net.ConnectPoint;
......
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.igmp.impl;
package org.onosproject.igmp;
import org.onlab.packet.IGMP;
import org.onosproject.net.ConnectPoint;
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.igmp;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.Config;
/**
* Config object for access device data.
*/
public class IgmpDeviceConfig extends Config<DeviceId> {
/**
* Gets the device information.
*
* @return device information
*/
public IgmpDeviceData getDevice() {
return new IgmpDeviceData(subject());
}
}
package org.onosproject.igmp;
import org.onosproject.net.DeviceId;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Information about an igmp enabled device.
*/
public class IgmpDeviceData {
private static final String DEVICE_ID_MISSING = "Device ID cannot be null";
private final DeviceId deviceId;
public IgmpDeviceData(DeviceId deviceId) {
this.deviceId = checkNotNull(deviceId, DEVICE_ID_MISSING);
}
/**
* Retrieves the access device ID.
*
* @return device ID
*/
public DeviceId deviceId() {
return deviceId;
}
}
......@@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.igmp.impl;
package org.onosproject.igmp;
import static org.slf4j.LoggerFactory.getLogger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
......@@ -30,8 +31,13 @@ import org.onlab.packet.IpPrefix;
import org.onlab.packet.IGMP;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.config.NetworkConfigRegistry;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.MulticastRouteService;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketPriority;
......@@ -39,20 +45,34 @@ import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.slf4j.Logger;
import java.util.Optional;
/**
* Internet Group Management Protocol.
*/
@Component(immediate = true)
public class IGMPComponent {
public class IgmpSnoop {
private final Logger log = getLogger(getClass());
private static final String DEFAULT_MCAST_ADDR = "224.0.0.0/4";
@Property(name = "multicastAddress",
label = "Define the multicast base raneg to listen to")
private String multicastAddress = DEFAULT_MCAST_ADDR;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
private IGMPPacketProcessor processor = new IGMPPacketProcessor();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry networkConfig;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MulticastRouteService multicastService;
private IgmpPacketProcessor processor = new IgmpPacketProcessor();
private static ApplicationId appId;
@Activate
......@@ -61,11 +81,16 @@ public class IGMPComponent {
packetService.addProcessor(processor, PacketProcessor.director(1));
// Build a traffic selector for all multicast traffic
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
selector.matchEthType(Ethernet.TYPE_IPV4);
selector.matchIPProtocol(IPv4.PROTOCOL_IGMP);
packetService.requestPackets(selector.build(), PacketPriority.REACTIVE, appId);
networkConfig.getSubjects(DeviceId.class, IgmpDeviceConfig.class).forEach(
subject -> {
IgmpDeviceConfig config = networkConfig.getConfig(subject,
IgmpDeviceConfig.class);
if (config != null) {
IgmpDeviceData data = config.getDevice();
submitPacketRequests(data.deviceId());
}
}
);
log.info("Started");
}
......@@ -77,10 +102,21 @@ public class IGMPComponent {
log.info("Stopped");
}
private void submitPacketRequests(DeviceId deviceId) {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
selector.matchEthType(Ethernet.TYPE_IPV4);
selector.matchIPProtocol(IPv4.PROTOCOL_IGMP);
packetService.requestPackets(selector.build(),
PacketPriority.REACTIVE,
appId,
Optional.of(deviceId));
}
/**
* Packet processor responsible for handling IGMP packets.
*/
private class IGMPPacketProcessor implements PacketProcessor {
private class IgmpPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
......@@ -107,15 +143,16 @@ public class IGMPComponent {
IPv4 ip = (IPv4) ethPkt.getPayload();
IpAddress gaddr = IpAddress.valueOf(ip.getDestinationAddress());
IpAddress saddr = Ip4Address.valueOf(ip.getSourceAddress());
log.debug("Packet (" + saddr.toString() + ", " + gaddr.toString() +
"\tingress port: " + context.inPacket().receivedFrom().toString());
log.debug("Packet ({}, {}) -> ingress port: {}", saddr, gaddr,
context.inPacket().receivedFrom());
if (ip.getProtocol() != IPv4.PROTOCOL_IGMP) {
log.debug("IGMP Picked up a non IGMP packet.");
return;
}
IpPrefix mcast = IpPrefix.valueOf("224.0.0.0/4");
IpPrefix mcast = IpPrefix.valueOf(DEFAULT_MCAST_ADDR);
if (!mcast.contains(gaddr)) {
log.debug("IGMP Picked up a non multicast packet.");
return;
......@@ -125,8 +162,6 @@ public class IGMPComponent {
log.debug("IGMP Picked up a packet with a multicast source address.");
return;
}
IpPrefix spfx = IpPrefix.valueOf(saddr, 32);
IpPrefix gpfx = IpPrefix.valueOf(gaddr, 32);
IGMP igmp = (IGMP) ip.getPayload();
switch (igmp.getIgmpType()) {
......@@ -136,14 +171,14 @@ public class IGMPComponent {
break;
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
IGMPProcessQuery.processQuery(igmp, pkt.receivedFrom());
processQuery(igmp, pkt.receivedFrom());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
log.debug("IGMP version 1 & 2 message types are not currently supported. Message type: " +
igmp.getIgmpType());
igmp.getIgmpType());
break;
default:
......@@ -152,4 +187,16 @@ public class IGMPComponent {
}
}
}
private void processQuery(IGMP pkt, ConnectPoint location) {
pkt.getGroups().forEach(group -> group.getSources().forEach(src -> {
McastRoute route = new McastRoute(src,
group.getGaddr(),
McastRoute.Type.IGMP);
multicastService.add(route);
multicastService.addSink(route, location);
}));
}
}
......
......@@ -17,4 +17,4 @@
/**
* IGMP implementation.
*/
package org.onosproject.igmp.impl;
package org.onosproject.igmp;
......
......@@ -17,9 +17,6 @@ package org.onosproject.net.mcast;
import com.google.common.annotations.Beta;
import org.onosproject.event.AbstractEvent;
import org.onosproject.net.ConnectPoint;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
......@@ -28,10 +25,8 @@ import static com.google.common.base.MoreObjects.toStringHelper;
* sinks or sources.
*/
@Beta
public class McastEvent extends AbstractEvent<McastEvent.Type, McastRoute> {
public class McastEvent extends AbstractEvent<McastEvent.Type, McastRouteInfo> {
private final Optional<ConnectPoint> sink;
private final Optional<ConnectPoint> source;
public enum Type {
/**
......@@ -60,59 +55,15 @@ public class McastEvent extends AbstractEvent<McastEvent.Type, McastRoute> {
SINK_REMOVED
}
private McastEvent(McastEvent.Type type, McastRoute subject) {
super(type, subject);
sink = Optional.empty();
source = Optional.empty();
}
private McastEvent(McastEvent.Type type, McastRoute subject, long time) {
super(type, subject, time);
sink = Optional.empty();
source = Optional.empty();
}
public McastEvent(McastEvent.Type type, McastRoute subject,
ConnectPoint sink,
ConnectPoint source) {
public McastEvent(McastEvent.Type type, McastRouteInfo subject) {
super(type, subject);
this.sink = Optional.ofNullable(sink);
this.source = Optional.ofNullable(source);
}
public McastEvent(McastEvent.Type type, McastRoute subject, long time,
ConnectPoint sink,
ConnectPoint source) {
super(type, subject, time);
this.sink = Optional.ofNullable(sink);
this.source = Optional.ofNullable(source);
}
/**
* The sink which has been removed or added. The field may not be set
* if the sink has not been detected yet or has been removed.
*
* @return an optional connect point
*/
public Optional<ConnectPoint> sink() {
return sink;
}
/**
* The source which has been removed or added.
* @return an optional connect point
*/
public Optional<ConnectPoint> source() {
return source;
}
@Override
public String toString() {
return toStringHelper(this)
.add("type", type())
.add("route", subject())
.add("source", source)
.add("sinks", sink).toString();
.add("info", subject()).toString();
}
}
......
......@@ -17,7 +17,7 @@ package org.onosproject.net.mcast;
import com.google.common.annotations.Beta;
import com.google.common.base.Objects;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.IpAddress;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -46,11 +46,11 @@ public class McastRoute {
STATIC
}
private final IpPrefix source;
private final IpPrefix group;
private final IpAddress source;
private final IpAddress group;
private final Type type;
public McastRoute(IpPrefix source, IpPrefix group, Type type) {
public McastRoute(IpAddress source, IpAddress group, Type type) {
checkNotNull(source, "Multicast route must have a source");
checkNotNull(group, "Multicast route must specify a group address");
checkNotNull(type, "Must indicate what type of route");
......@@ -64,7 +64,7 @@ public class McastRoute {
*
* @return an ip address
*/
public IpPrefix source() {
public IpAddress source() {
return source;
}
......@@ -73,7 +73,7 @@ public class McastRoute {
*
* @return an ip address
*/
public IpPrefix group() {
public IpAddress group() {
return group;
}
......
package org.onosproject.net.mcast;
import com.google.common.collect.ImmutableSet;
import org.onosproject.net.ConnectPoint;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Multicast information as stored in the store.
*/
public final class McastRouteInfo {
private static final String ROUTE_NOT_NULL = "Route cannot be null";
private final McastRoute route;
private final Optional<ConnectPoint> sink;
private final Optional<ConnectPoint> source;
private final Set<ConnectPoint> sinks;
private McastRouteInfo(McastRoute route, ConnectPoint sink,
ConnectPoint source, Set<ConnectPoint> sinks) {
this.route = checkNotNull(route, ROUTE_NOT_NULL);
this.sink = Optional.ofNullable(sink);
this.source = Optional.ofNullable(source);
this.sinks = sinks;
}
public static McastRouteInfo mcastRouteInfo(McastRoute route) {
return new McastRouteInfo(route, null, null, Collections.EMPTY_SET);
}
public static McastRouteInfo mcastRouteInfo(McastRoute route,
ConnectPoint sink,
ConnectPoint source) {
return new McastRouteInfo(route, sink, source, Collections.EMPTY_SET);
}
public static McastRouteInfo mcastRouteInfo(McastRoute route,
Set<ConnectPoint> sinks,
ConnectPoint source) {
return new McastRouteInfo(route, null, source, ImmutableSet.copyOf(sinks));
}
public boolean isComplete() {
return ((sink.isPresent() || sinks.size() > 0) && source.isPresent());
}
/**
* The route associated with this multicast information.
*
* @return a mulicast route
*/
public McastRoute route() {
return route;
}
/**
* The source which has been removed or added.
* @return an optional connect point
*/
public Optional<ConnectPoint> source() {
return source;
}
/**
* The sink which has been removed or added. The field may not be set
* if the sink has not been detected yet or has been removed.
*
* @return an optional connect point
*/
public Optional<ConnectPoint> sink() {
return sink;
}
/**
* Returns the set of sinks associated with this route. Only valid with
* SOURCE_ADDED events.
*
* @return a set of connect points
*/
public Set<ConnectPoint> sinks() {
return sinks;
}
}
package org.onosproject.net.mcast;
import org.onosproject.net.ConnectPoint;
import org.onosproject.store.Store;
import java.util.Set;
/**
* Entity responsible for storing multicast state information.
*/
public interface McastStore extends Store<McastEvent, McastStoreDelegate> {
enum Type {
/**
* Adding a route to the mcast rib.
*/
ADD,
/**
* Removing a route from the mcast rib.
*/
REMOVE
}
/**
* Updates the store with the route information.
*
* @param route a multicast route
* @param operation an operation
*/
void storeRoute(McastRoute route, Type operation);
/**
* Updates the store with source information for the given route. Only one
* source is permitted. Submitting another source will replace the previous
* value.
*
* @param route a multicast route
* @param source a source
*/
void storeSource(McastRoute route, ConnectPoint source);
/**
* Updates the store with sink information for a given route. There may be
* multiple sinks.
*
* @param route a multicast route
* @param sink a sink
* @param operation an operation
*/
void storeSink(McastRoute route, ConnectPoint sink, Type operation);
/**
* Obtain the source for a multicast route.
*
* @param route a multicast route
* @return a connect point
*/
ConnectPoint sourceFor(McastRoute route);
/**
* Obtain the sinks for a multicast route.
*
* @param route a multicast route
* @return a set of sinks
*/
Set<ConnectPoint> sinksFor(McastRoute route);
}
package org.onosproject.net.mcast;
import org.onosproject.store.StoreDelegate;
/**
* Mcast store delegate abstraction.
*/
public interface McastStoreDelegate extends StoreDelegate<McastEvent> {
}
......@@ -19,7 +19,7 @@ import com.google.common.annotations.Beta;
import org.onosproject.event.ListenerService;
import org.onosproject.net.ConnectPoint;
import java.util.List;
import java.util.Set;
/**
* A service interface for maintaining multicast information.
......@@ -82,5 +82,5 @@ public interface MulticastRouteService
* @param route a multicast route
* @return a list of connect points
*/
List<ConnectPoint> fetchSinks(McastRoute route);
Set<ConnectPoint> fetchSinks(McastRoute route);
}
......
......@@ -21,25 +21,19 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.mcast.McastEvent;
import org.onosproject.net.mcast.McastListener;
import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.McastStore;
import org.onosproject.net.mcast.McastStoreDelegate;
import org.onosproject.net.mcast.MulticastRouteService;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -52,38 +46,18 @@ public class MulticastRouteManager
implements MulticastRouteService {
//TODO: add MulticastRouteAdminService
private static final String MCASTRIB = "mcast-rib-table";
private Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
private final McastStoreDelegate delegate = new InternalMcastStoreDelegate();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private CoreService coreService;
protected ApplicationId appId;
protected ConsistentMap<McastRoute, MulticastData> mcastRoutes;
protected McastStore store;
@Activate
public void activate() {
eventDispatcher.addSink(McastEvent.class, listenerRegistry);
appId = coreService.registerApplication("org.onosproject.mcastrib");
mcastRoutes = storageService.<McastRoute, MulticastData>consistentMapBuilder()
.withApplicationId(appId)
.withName(MCASTRIB)
.withSerializer(Serializer.using(KryoNamespace.newBuilder().register(
MulticastData.class,
McastRoute.class,
McastRoute.Type.class,
IpPrefix.class,
List.class,
ConnectPoint.class
).build())).build();
store.setDelegate(delegate);
log.info("Started");
}
......@@ -95,80 +69,55 @@ public class MulticastRouteManager
@Override
public void add(McastRoute route) {
mcastRoutes.put(route, MulticastData.empty());
post(new McastEvent(McastEvent.Type.ROUTE_ADDED, route, null, null));
checkNotNull(route, "Route cannot be null");
store.storeRoute(route, McastStore.Type.ADD);
}
@Override
public void remove(McastRoute route) {
mcastRoutes.remove(route);
post(new McastEvent(McastEvent.Type.ROUTE_REMOVED, route, null, null));
checkNotNull(route, "Route cannot be null");
store.storeRoute(route, McastStore.Type.REMOVE);
}
@Override
public void addSource(McastRoute route, ConnectPoint connectPoint) {
Versioned<MulticastData> d = mcastRoutes.compute(route, (k, v) -> {
if (v.isEmpty()) {
return new MulticastData(connectPoint);
} else {
log.warn("Route {} is already in use.", route);
return v;
}
});
if (d != null) {
post(new McastEvent(McastEvent.Type.SOURCE_ADDED,
route, null, connectPoint));
}
checkNotNull(route, "Route cannot be null");
checkNotNull(connectPoint, "Source cannot be null");
store.storeSource(route, connectPoint);
}
@Override
public void addSink(McastRoute route, ConnectPoint connectPoint) {
AtomicReference<ConnectPoint> source = new AtomicReference<>();
mcastRoutes.compute(route, (k, v) -> {
if (!v.isEmpty()) {
v.appendSink(connectPoint);
source.set(v.source());
} else {
log.warn("Route {} does not exist");
}
return v;
});
if (source.get() != null) {
post(new McastEvent(McastEvent.Type.SINK_ADDED, route,
connectPoint, source.get()));
}
checkNotNull(route, "Route cannot be null");
checkNotNull(connectPoint, "Sink cannot be null");
store.storeSink(route, connectPoint, McastStore.Type.ADD);
}
@Override
public void removeSink(McastRoute route, ConnectPoint connectPoint) {
AtomicReference<ConnectPoint> source = new AtomicReference<>();
mcastRoutes.compute(route, (k, v) -> {
if (v.removeSink(connectPoint)) {
source.set(v.source());
}
return v;
});
if (source.get() != null) {
post(new McastEvent(McastEvent.Type.SINK_REMOVED, route,
connectPoint, source.get()));
}
checkNotNull(route, "Route cannot be null");
checkNotNull(connectPoint, "Sink cannot be null");
store.storeSink(route, connectPoint, McastStore.Type.REMOVE);
}
@Override
public ConnectPoint fetchSource(McastRoute route) {
MulticastData d = mcastRoutes.asJavaMap().getOrDefault(route,
MulticastData.empty());
return d.source();
return store.sourceFor(route);
}
@Override
public List<ConnectPoint> fetchSinks(McastRoute route) {
MulticastData d = mcastRoutes.asJavaMap().getOrDefault(route,
MulticastData.empty());
return d.sinks();
public Set<ConnectPoint> fetchSinks(McastRoute route) {
return store.sinksFor(route);
}
private class InternalMcastStoreDelegate implements McastStoreDelegate {
@Override
public void notify(McastEvent event) {
post(event);
}
}
}
......
......@@ -16,15 +16,17 @@
package org.onosproject.incubator.net.mcast.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.junit.TestUtils;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.IpAddress;
import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.core.DefaultApplicationId;
import org.onosproject.incubator.store.mcast.impl.DistributedMcastStore;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.PortNumber;
import org.onosproject.net.mcast.McastEvent;
......@@ -44,16 +46,16 @@ import static org.onosproject.net.NetTestTools.injectEventDispatcher;
*/
public class MulticastRouteManagerTest {
McastRoute r1 = new McastRoute(IpPrefix.valueOf("1.1.1.1/8"),
IpPrefix.valueOf("1.1.1.2/8"),
McastRoute r1 = new McastRoute(IpAddress.valueOf("1.1.1.1"),
IpAddress.valueOf("1.1.1.2"),
McastRoute.Type.IGMP);
McastRoute r11 = new McastRoute(IpPrefix.valueOf("1.1.1.1/8"),
IpPrefix.valueOf("1.1.1.2/8"),
McastRoute r11 = new McastRoute(IpAddress.valueOf("1.1.1.1"),
IpAddress.valueOf("1.1.1.2"),
McastRoute.Type.STATIC);
McastRoute r2 = new McastRoute(IpPrefix.valueOf("2.2.2.1/8"),
IpPrefix.valueOf("2.2.2.2/8"),
McastRoute r2 = new McastRoute(IpAddress.valueOf("2.2.2.1"),
IpAddress.valueOf("2.2.2.2"),
McastRoute.Type.PIM);
ConnectPoint cp1 = new ConnectPoint(did("1"), PortNumber.portNumber(1));
......@@ -66,13 +68,17 @@ public class MulticastRouteManagerTest {
private List<McastEvent> events;
private DistributedMcastStore mcastStore;
@Before
public void setUp() throws Exception {
manager = new MulticastRouteManager();
mcastStore = new DistributedMcastStore();
TestUtils.setField(mcastStore, "storageService", new TestStorageService());
injectEventDispatcher(manager, new TestEventDispatcher());
TestUtils.setField(manager, "storageService", new TestStorageService());
TestUtils.setField(manager, "coreService", new TestCoreService());
events = Lists.newArrayList();
manager.store = mcastStore;
mcastStore.activate();
manager.activate();
manager.addListener(listener);
}
......@@ -81,13 +87,13 @@ public class MulticastRouteManagerTest {
public void tearDown() {
manager.removeListener(listener);
manager.deactivate();
mcastStore.deactivate();
}
@Test
public void testAdd() {
manager.add(r1);
assertEquals("Add failed", manager.mcastRoutes.size(), 1);
validateEvents(McastEvent.Type.ROUTE_ADDED);
}
......@@ -97,48 +103,39 @@ public class MulticastRouteManagerTest {
manager.remove(r1);
assertEquals("Remove failed", manager.mcastRoutes.size(), 0);
validateEvents(McastEvent.Type.ROUTE_ADDED, McastEvent.Type.ROUTE_REMOVED);
}
@Test
public void testAddSource() {
manager.add(r1);
manager.addSource(r1, cp1);
validateEvents(McastEvent.Type.ROUTE_ADDED, McastEvent.Type.SOURCE_ADDED);
validateEvents(McastEvent.Type.SOURCE_ADDED);
assertEquals("Route is not equal", cp1, manager.fetchSource(r1));
}
@Test
public void testAddSink() {
manager.add(r1);
manager.addSource(r1, cp1);
manager.addSink(r1, cp1);
validateEvents(McastEvent.Type.ROUTE_ADDED,
McastEvent.Type.SOURCE_ADDED,
McastEvent.Type.SINK_ADDED);
assertEquals("Route is not equal", Lists.newArrayList(cp1), manager.fetchSinks(r1));
validateEvents(McastEvent.Type.SINK_ADDED);
assertEquals("Route is not equal", Sets.newHashSet(cp1), manager.fetchSinks(r1));
}
@Test
public void testRemoveSink() {
manager.add(r1);
manager.addSource(r1, cp1);
manager.addSink(r1, cp1);
manager.addSink(r1, cp2);
manager.removeSink(r1, cp2);
validateEvents(McastEvent.Type.ROUTE_ADDED,
McastEvent.Type.SOURCE_ADDED,
validateEvents(McastEvent.Type.SOURCE_ADDED,
McastEvent.Type.SINK_ADDED,
McastEvent.Type.SINK_ADDED,
McastEvent.Type.SINK_REMOVED);
assertEquals("Route is not equal", Lists.newArrayList(cp1), manager.fetchSinks(r1));
assertEquals("Route is not equal", Sets.newHashSet(cp1), manager.fetchSinks(r1));
}
private void validateEvents(McastEvent.Type... evs) {
......
package org.onosproject.incubator.store.mcast.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoNamespace;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.mcast.McastEvent;
import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.McastRouteInfo;
import org.onosproject.net.mcast.McastStore;
import org.onosproject.net.mcast.McastStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A distributed mcast store implementation. Routes are stored consistently
* across the cluster.
*/
@Component(immediate = true)
@Service
public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreDelegate>
implements McastStore {
//FIXME the number of events that will potentially be generated here is
// not sustainable, consider changing this to an eventually consistent
// map and not emitting events but rather use a provider-like mechanism
// to program the dataplane.
private static final String MCASTRIB = "mcast-rib-table";
private Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private StorageService storageService;
protected ConsistentMap<McastRoute, MulticastData> mcastRIB;
protected Map<McastRoute, MulticastData> mcastRoutes;
@Activate
public void activate() {
mcastRIB = storageService.<McastRoute, MulticastData>consistentMapBuilder()
.withName(MCASTRIB)
.withSerializer(Serializer.using(KryoNamespace.newBuilder().register(
MulticastData.class,
McastRoute.class,
McastRoute.Type.class,
IpPrefix.class,
List.class,
ConnectPoint.class
).build()))
.withRelaxedReadConsistency()
.build();
mcastRoutes = mcastRIB.asJavaMap();
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public void storeRoute(McastRoute route, Type operation) {
switch (operation) {
case ADD:
if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
delegate.notify(new McastEvent(McastEvent.Type.ROUTE_ADDED,
McastRouteInfo.mcastRouteInfo(route)));
}
break;
case REMOVE:
if (mcastRoutes.remove(route) != null) {
delegate.notify(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
McastRouteInfo.mcastRouteInfo(route)));
}
break;
default:
log.warn("Unknown mcast operation type: {}", operation);
}
}
@Override
public void storeSource(McastRoute route, ConnectPoint source) {
MulticastData data = mcastRoutes.compute(route, (k, v) -> {
if (v == null) {
return new MulticastData(source);
} else {
v.setSource(source);
}
return v;
});
if (data != null) {
delegate.notify(new McastEvent(McastEvent.Type.SOURCE_ADDED,
McastRouteInfo.mcastRouteInfo(route,
data.sinks(),
source)));
}
}
@Override
public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
MulticastData data = mcastRoutes.compute(route, (k, v) -> {
switch (operation) {
case ADD:
if (v == null) {
v = MulticastData.empty();
}
v.appendSink(sink);
break;
case REMOVE:
if (v != null) {
v.removeSink(sink);
}
break;
default:
log.warn("Unknown mcast operation type: {}", operation);
}
return v;
});
if (data != null) {
switch (operation) {
case ADD:
delegate.notify(new McastEvent(
McastEvent.Type.SINK_ADDED,
McastRouteInfo.mcastRouteInfo(route,
sink,
data.source())));
break;
case REMOVE:
if (data != null) {
delegate.notify(new McastEvent(
McastEvent.Type.SINK_REMOVED,
McastRouteInfo.mcastRouteInfo(route,
sink,
data.source())));
}
break;
default:
log.warn("Unknown mcast operation type: {}", operation);
}
}
}
@Override
public ConnectPoint sourceFor(McastRoute route) {
return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
}
@Override
public Set<ConnectPoint> sinksFor(McastRoute route) {
return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
}
}
......@@ -13,14 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.net.mcast.impl;
package org.onosproject.incubator.store.mcast.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.onosproject.net.ConnectPoint;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -30,40 +31,33 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public final class MulticastData {
private final ConnectPoint source;
private final List<ConnectPoint> sinks;
private final boolean isEmpty;
private final AtomicReference<ConnectPoint> source =
new AtomicReference<>();
private final Set<ConnectPoint> sinks;
private final AtomicBoolean isEmpty = new AtomicBoolean();
private MulticastData() {
this.source = null;
this.sinks = Collections.EMPTY_LIST;
isEmpty = true;
}
public MulticastData(ConnectPoint source, List<ConnectPoint> sinks) {
this.source = checkNotNull(source, "Multicast source cannot be null.");
this.sinks = checkNotNull(sinks, "List of sinks cannot be null.");
isEmpty = false;
}
public MulticastData(ConnectPoint source, ConnectPoint sink) {
this.source = checkNotNull(source, "Multicast source cannot be null.");
this.sinks = Lists.newArrayList(checkNotNull(sink, "Sink cannot be null."));
isEmpty = false;
this.sinks = Sets.newConcurrentHashSet();
isEmpty.set(true);
}
public MulticastData(ConnectPoint source) {
this.source = checkNotNull(source, "Multicast source cannot be null.");
this.sinks = Lists.newArrayList();
isEmpty = false;
this.source.set(checkNotNull(source, "Multicast source cannot be null."));
this.sinks = Sets.newConcurrentHashSet();
isEmpty.set(false);
}
public ConnectPoint source() {
return source;
return source.get();
}
public Set<ConnectPoint> sinks() {
return ImmutableSet.copyOf(sinks);
}
public List<ConnectPoint> sinks() {
return ImmutableList.copyOf(sinks);
public void setSource(ConnectPoint source) {
isEmpty.set(false);
this.source.set(source);
}
public void appendSink(ConnectPoint sink) {
......@@ -75,7 +69,7 @@ public final class MulticastData {
}
public boolean isEmpty() {
return isEmpty;
return isEmpty.get();
}
public static MulticastData empty() {
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* A distributed multicast store implementation that stores multicast rib
* data consistently across the cluster.
*/
package org.onosproject.incubator.store.mcast.impl;