alshabib

adding igmp query polling to igmp application.

Change-Id: I995336417e11404d96f33cdae96b12202d454dd1

adding SafeRecurringTask

Change-Id: Ie560e61500f85339c296f03ed8684078737edcd1
......@@ -57,7 +57,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
......@@ -84,36 +83,4 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>
${project.groupId}.${project.artifactId}
</Bundle-SymbolicName>
<Import-Package>
org.slf4j,
org.osgi.framework,
com.google.common.*,
org.onlab.packet.*,
org.onosproject.*,
</Import-Package>
</instructions>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
......
......@@ -25,10 +25,12 @@ import org.onlab.packet.EthType;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IGMP;
import org.onlab.packet.IGMPMembership;
import org.onlab.packet.IGMPQuery;
import org.onlab.packet.IPv4;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.SafeRecurringTask;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.ConnectPoint;
......@@ -44,6 +46,7 @@ import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flowobjective.DefaultFilteringObjective;
import org.onosproject.net.flowobjective.FilteringObjective;
......@@ -53,6 +56,7 @@ import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.mcast.McastRoute;
import org.onosproject.net.mcast.MulticastRouteService;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
......@@ -61,10 +65,16 @@ import org.onosproject.olt.AccessDeviceConfig;
import org.onosproject.olt.AccessDeviceData;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -72,14 +82,29 @@ import static org.slf4j.LoggerFactory.getLogger;
*/
@Component(immediate = true)
public class IgmpSnoop {
private final Logger log = getLogger(getClass());
private static final String DEST_MAC = "01:00:5E:00:00:01";
private static final String DEST_IP = "224.0.0.1";
private static final int DEFAULT_QUERY_PERIOD_SECS = 60;
private static final byte DEFAULT_IGMP_RESP_CODE = 0;
private static final String DEFAULT_MCAST_ADDR = "224.0.0.0/4";
@Property(name = "multicastAddress",
label = "Define the multicast base range to listen to")
private String multicastAddress = DEFAULT_MCAST_ADDR;
@Property(name = "queryPeriod", intValue = DEFAULT_QUERY_PERIOD_SECS,
label = "Delay in seconds between successive query runs")
private int queryPeriod = DEFAULT_QUERY_PERIOD_SECS;
@Property(name = "maxRespCode", byteValue = DEFAULT_IGMP_RESP_CODE,
label = "Maximum time allowed before sending a responding report")
private byte maxRespCode = DEFAULT_IGMP_RESP_CODE;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowObjectiveService flowObjectiveService;
......@@ -98,6 +123,12 @@ public class IgmpSnoop {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private ScheduledFuture<?> queryTask;
private final ScheduledExecutorService queryService =
Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/igmp-query",
"membership-query"));
private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
private DeviceListener deviceListener = new InternalDeviceListener();
......@@ -120,6 +151,9 @@ public class IgmpSnoop {
};
private ByteBuffer queryPacket;
@Activate
public void activate() {
appId = coreService.registerApplication("org.onosproject.igmp");
......@@ -149,6 +183,14 @@ public class IgmpSnoop {
deviceService.addListener(deviceListener);
queryPacket = buildQueryPacket();
queryTask = queryService.scheduleWithFixedDelay(
SafeRecurringTask.wrap(this::querySubscribers),
0,
queryPeriod,
TimeUnit.SECONDS);
log.info("Started");
}
......@@ -159,6 +201,8 @@ public class IgmpSnoop {
deviceService.removeListener(deviceListener);
networkConfig.removeListener(configListener);
networkConfig.unregisterConfigFactory(configFactory);
queryTask.cancel(true);
queryService.shutdownNow();
log.info("Stopped");
}
......@@ -194,19 +238,6 @@ public class IgmpSnoop {
flowObjectiveService.filter(devId, igmp);
}
private void processQuery(IGMP pkt, ConnectPoint location) {
// TODO is this the right thing to do for a query?
pkt.getGroups().forEach(group -> group.getSources().forEach(src -> {
McastRoute route = new McastRoute(src,
group.getGaddr(),
McastRoute.Type.IGMP);
multicastService.add(route);
multicastService.addSink(route, location);
}));
}
private void processMembership(IGMP pkt, ConnectPoint location) {
pkt.getGroups().forEach(group -> {
......@@ -218,8 +249,8 @@ public class IgmpSnoop {
IGMPMembership membership = (IGMPMembership) group;
McastRoute route = new McastRoute(IpAddress.valueOf("0.0.0.0"),
group.getGaddr(),
McastRoute.Type.IGMP);
group.getGaddr(),
McastRoute.Type.IGMP);
if (membership.getRecordType() == IGMPMembership.MODE_IS_INCLUDE ||
membership.getRecordType() == IGMPMembership.CHANGE_TO_INCLUDE_MODE) {
......@@ -237,6 +268,44 @@ public class IgmpSnoop {
});
}
private ByteBuffer buildQueryPacket() {
IGMP igmp = new IGMP();
igmp.setIgmpType(IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY);
igmp.setMaxRespCode(maxRespCode);
IGMPQuery query = new IGMPQuery(IpAddress.valueOf("0.0.0.0"), 0);
igmp.addGroup(query);
IPv4 ip = new IPv4();
ip.setDestinationAddress(DEST_IP);
ip.setProtocol(IPv4.PROTOCOL_IGMP);
ip.setSourceAddress("192.168.1.1");
ip.setTtl((byte) 1);
ip.setPayload(igmp);
Ethernet eth = new Ethernet();
eth.setDestinationMACAddress(DEST_MAC);
eth.setSourceMACAddress("DE:AD:BE:EF:BA:11");
eth.setEtherType(Ethernet.TYPE_IPV4);
eth.setPayload(ip);
return ByteBuffer.wrap(eth.serialize());
}
private void querySubscribers() {
oltData.keySet().stream()
.flatMap(did -> deviceService.getPorts(did).stream())
.filter(p -> !oltData.get(p.element().id()).uplink().equals(p.number()))
.filter(p -> p.isEnabled())
.forEach(p -> {
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(p.number()).build();
packetService.emit(new DefaultOutboundPacket((DeviceId) p.element().id(),
treatment, queryPacket));
});
}
/**
* Packet processor responsible for handling IGMP packets.
*/
......@@ -295,14 +364,15 @@ public class IgmpSnoop {
break;
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
processQuery(igmp, pkt.receivedFrom());
log.debug("Received a membership query {} from {}",
igmp, pkt.receivedFrom());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
log.debug("IGMP version 1 & 2 message types are not currently supported. Message type: {}",
igmp.getIgmpType());
igmp.getIgmpType());
break;
default:
log.debug("Unknown IGMP message type: {}", igmp.getIgmpType());
......