Jonathan Hart

Add neighbor lifecycle management.

 * Made PIMNeighbor mostly immutable (apart from updatable timestamp)
 * Equals and hashCode for PIMNeighbor
 * Remove neighbor when we see a HELLO with holdTime==0
 * Periodic task to time out neighbors who haven't sent a HELLO in a while
 * Added a CLI command to view PIM neighbors

Change-Id: I59e52a847f7abcb8e9ac660c2cccace53e46867b
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.pim.cli;
import org.apache.karaf.shell.commands.Command;
import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.pim.impl.PIMInterface;
import org.onosproject.pim.impl.PIMInterfaceService;
import org.onosproject.pim.impl.PIMNeighbor;
import java.util.Set;
/**
* Lists PIM neighbors.
*/
@Command(scope = "onos", name = "pim-neighbors",
description = "Lists the PIM neighbors")
public class PimNeighborsListCommand extends AbstractShellCommand {
private static final String INTF_FORMAT = "interface=%s, address=%s";
private static final String NEIGHBOR_FORMAT = " neighbor=%s, uptime=%s, holdtime=%s, drPriority=%s, genId=%s";
@Override
protected void execute() {
PIMInterfaceService interfaceService = get(PIMInterfaceService.class);
Set<PIMInterface> interfaces = interfaceService.getPimInterfaces();
for (PIMInterface intf : interfaces) {
print(INTF_FORMAT, intf.getInterface().name(), intf.getIpAddress());
for (PIMNeighbor neighbor : intf.getNeighbors()) {
// Filter out the PIM neighbor representing 'us'
if (!neighbor.ipAddress().equals(intf.getIpAddress())) {
print(NEIGHBOR_FORMAT, neighbor.ipAddress(),
Tools.timeAgo(neighbor.upTime()), neighbor.holdtime(),
neighbor.priority(), neighbor.generationId());
}
}
}
}
}
......@@ -32,10 +32,12 @@ import org.onosproject.net.packet.PacketService;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
......@@ -99,10 +101,7 @@ public final class PIMInterface {
generationId = new Random().nextInt();
// Create a PIM Neighbor to represent ourselves for DR election.
PIMNeighbor us = new PIMNeighbor(ourIp, mac);
// Priority and IP address are all we need to DR election.
us.setPriority(priority);
PIMNeighbor us = new PIMNeighbor(ourIp, mac, holdTime, 0, priority, generationId);
pimNeighbors.put(ourIp, us);
drIpaddress = ourIp;
......@@ -199,6 +198,32 @@ public final class PIMInterface {
}
/**
* Gets the neighbors seen on this interface.
*
* @return PIM neighbors
*/
public Collection<PIMNeighbor> getNeighbors() {
return pimNeighbors.values();
}
/**
* Checks whether any of our neighbors have expired, and cleans up their
* state if they have.
*/
public void checkNeighborTimeouts() {
Set<PIMNeighbor> expired = pimNeighbors.values().stream()
// Don't time ourselves out!
.filter(neighbor -> !neighbor.ipAddress().equals(getIpAddress()))
.filter(neighbor -> neighbor.isExpired())
.collect(Collectors.toSet());
for (PIMNeighbor neighbor : expired) {
log.info("Timing out neighbor {}", neighbor);
pimNeighbors.remove(neighbor.ipAddress(), neighbor);
}
}
/**
* Multicast a hello message out our interface. This hello message is sent
* periodically during the normal PIM Neighbor refresh time, as well as a
* result of a newly created interface.
......@@ -234,7 +259,7 @@ public final class PIMInterface {
* <li>We <em>may</em> have to create a new neighbor if one does not already exist</li>
* <li>We <em>may</em> need to re-elect a new DR if new information is received</li>
* <li>We <em>may</em> need to send an existing neighbor all joins if the genid changed</li>
* <li>We will refresh the neighbors timestamp</li>
* <li>We will refresh the neighbor's timestamp</li>
* </ul>
*
* @param ethPkt the Ethernet packet header
......@@ -259,7 +284,7 @@ public final class PIMInterface {
checkNotNull(dr);
IpAddress drip = drIpaddress;
int drpri = dr.getPriority();
int drpri = dr.priority();
// Assume we do not need to run a DR election
boolean reElectDr = false;
......@@ -269,18 +294,24 @@ public final class PIMInterface {
// Determine if we already have a PIMNeighbor
PIMNeighbor nbr = pimNeighbors.getOrDefault(srcip, null);
PIMNeighbor newNbr = PIMNeighbor.createPimNeighbor(srcip, nbrmac, hello.getOptions().values());
if (nbr == null) {
nbr = new PIMNeighbor(srcip, hello.getOptions());
checkNotNull(nbr);
} else {
Integer previousGenid = nbr.getGenid();
nbr.addOptions(hello.getOptions());
if (previousGenid != nbr.getGenid()) {
genidChanged = true;
pimNeighbors.putIfAbsent(srcip, newNbr);
nbr = newNbr;
} else if (!nbr.equals(newNbr)) {
if (newNbr.holdtime() == 0) {
// Neighbor has shut down. Remove them and clean up
pimNeighbors.remove(srcip, nbr);
return;
} else {
// Neighbor has changed one of their options.
pimNeighbors.put(srcip, newNbr);
nbr = newNbr;
}
}
// Refresh this neighbors timestamp
// Refresh this neighbor's timestamp
nbr.refreshTimestamp();
/*
......@@ -300,8 +331,8 @@ public final class PIMInterface {
// Run an election if we need to. Return the elected IP address.
private IpAddress election(PIMNeighbor nbr, IpAddress drIp, int drPriority) {
IpAddress nbrIp = nbr.getIpaddr();
if (nbr.getPriority() > drPriority) {
IpAddress nbrIp = nbr.ipAddress();
if (nbr.priority() > drPriority) {
return nbrIp;
}
......
......@@ -23,6 +23,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.SafeRecurringTask;
import org.onosproject.incubator.net.intf.Interface;
import org.onosproject.incubator.net.intf.InterfaceEvent;
import org.onosproject.incubator.net.intf.InterfaceListener;
......@@ -58,8 +59,10 @@ public class PIMInterfaceManager implements PIMInterfaceService {
private static final Class<PimInterfaceConfig> PIM_INTERFACE_CONFIG_CLASS = PimInterfaceConfig.class;
private static final String PIM_INTERFACE_CONFIG_KEY = "pimInterface";
// Create a Scheduled Executor service to send PIM hellos
private final ScheduledExecutorService helloScheduler =
private static final int DEFAULT_TIMEOUT_TASK_PERIOD_MS = 250;
// Create a Scheduled Executor service for recurring tasks
private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(1);
// Wait for a bout 3 seconds before sending the initial hello messages.
......@@ -69,6 +72,8 @@ public class PIMInterfaceManager implements PIMInterfaceService {
// Send PIM hello packets: 30 seconds.
private final long pimHelloPeriod = 30;
private final int timeoutTaskPeriod = DEFAULT_TIMEOUT_TASK_PERIOD_MS;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
......@@ -113,18 +118,16 @@ public class PIMInterfaceManager implements PIMInterfaceService {
interfaceService.addListener(interfaceListener);
// Schedule the periodic hello sender.
helloScheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
for (PIMInterface pif : pimInterfaces.values()) {
pif.sendHello();
}
} catch (Exception e) {
log.warn("exception", e);
}
}
}, initialHelloDelay, pimHelloPeriod, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(
SafeRecurringTask.wrap(
() -> pimInterfaces.values().forEach(PIMInterface::sendHello)),
initialHelloDelay, pimHelloPeriod, TimeUnit.SECONDS);
// Schedule task to periodically time out expired neighbors
scheduledExecutorService.scheduleAtFixedRate(
SafeRecurringTask.wrap(
() -> pimInterfaces.values().forEach(PIMInterface::checkNeighborTimeouts)),
0, timeoutTaskPeriod, TimeUnit.MILLISECONDS);
log.info("Started");
}
......@@ -136,7 +139,7 @@ public class PIMInterfaceManager implements PIMInterfaceService {
networkConfig.unregisterConfigFactory(pimConfigFactory);
// Shutdown the periodic hello task.
helloScheduler.shutdown();
scheduledExecutorService.shutdown();
log.info("Stopped");
}
......
......@@ -19,6 +19,9 @@
<command>
<action class="org.onosproject.pim.cli.PimInterfacesListCommand"/>
</command>
<command>
<action class="org.onosproject.pim.cli.PimNeighborsListCommand"/>
</command>
</command-bundle>
</blueprint>
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onlab.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Wrapper for a recurring task which catches all exceptions to prevent task
* being suppressed in a ScheduledExecutorService.
*/
public final class SafeRecurringTask implements Runnable {
private static final Logger log = LoggerFactory.getLogger(SafeRecurringTask.class);
private final Runnable runnable;
/**
* Constructor.
*
* @param runnable runnable to wrap
*/
private SafeRecurringTask(Runnable runnable) {
this.runnable = runnable;
}
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
log.info("Task interrupted, quitting");
return;
}
try {
runnable.run();
} catch (Exception e) {
// Catch all exceptions to avoid task being suppressed
log.error("Exception thrown during task", e);
}
}
/**
* Wraps a runnable in a safe recurring task.
*
* @param runnable runnable to wrap
* @return safe recurring task
*/
public static SafeRecurringTask wrap(Runnable runnable) {
return new SafeRecurringTask(runnable);
}
}