Saurav Das
Committed by Gerrit Code Review

Adding Device Listiner to BgpRouter so that filtering rules are sent to the

driver only after the device is available.

Change-Id: I377402b87cee6c02c087efbcc4f0cff4f19e1ca3
......@@ -20,6 +20,7 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -28,18 +29,18 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
import org.onlab.packet.Ip4Address;
import org.onlab.packet.Ip4Prefix;
import org.onlab.packet.Ip6Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.util.KryoNamespace;
import org.onosproject.config.NetworkConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.Criteria;
......@@ -50,7 +51,9 @@ import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.group.GroupService;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.packet.PacketService;
import org.onosproject.routing.FibEntry;
import org.onosproject.routing.FibListener;
......@@ -87,19 +90,19 @@ public class BgpRouter {
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowService;
protected RoutingService routingService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected GroupService groupService;
protected RoutingConfigurationService configService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RoutingService routingService;
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected RoutingConfigurationService configService;
protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
protected DeviceService deviceService;
//
// NOTE: Unused reference - needed to guarantee that the
......@@ -130,57 +133,32 @@ public class BgpRouter {
// learned from config
private DeviceId ctrlDeviceId;
//private final GroupListener groupListener = new InternalGroupListener();
// Responsible for handling BGP traffic (encapsulated within OF messages)
// between the data-plane switch and the Quagga VM using a control plane OVS.
private TunnellingConnectivityManager connectivityManager;
private DeviceListener deviceListener;
private IcmpHandler icmpHandler;
private KryoNamespace appKryo = new KryoNamespace.Builder()
.register(IpAddress.Version.class)
.register(IpAddress.class)
.register(Ip4Address.class)
.register(Ip6Address.class)
.register(byte[].class)
.register(NextHopGroupKey.class)
.build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowObjectiveService flowObjectiveService;
@Activate
protected void activate() {
appId = coreService.registerApplication(BGP_ROUTER_APP);
getDeviceConfiguration(configService.getBgpSpeakers());
//groupService.addListener(groupListener);
processIntfFilters(true, configService.getInterfaces());
connectivityManager = new TunnellingConnectivityManager(appId,
configService,
packetService,
flowObjectiveService);
icmpHandler = new IcmpHandler(configService, packetService);
deviceListener = new InnerDeviceListener();
routingService.addFibListener(new InternalFibListener());
routingService.start();
deviceService.addListener(deviceListener);
connectivityManager.start();
icmpHandler.start();
log.info("BgpRouter started");
delay(1000);
FibEntry fibEntry = new FibEntry(Ip4Prefix.valueOf("10.1.0.0/16"),
Ip4Address.valueOf("192.168.10.1"),
MacAddress.valueOf("DE:AD:BE:EF:FE:ED"));
FibUpdate fibUpdate = new FibUpdate(FibUpdate.Type.UPDATE, fibEntry);
updateFibEntry(Collections.singletonList(fibUpdate));
}
@Deactivate
......@@ -188,10 +166,8 @@ public class BgpRouter {
routingService.stop();
connectivityManager.stop();
icmpHandler.stop();
processIntfFilters(false, configService.getInterfaces());
//groupService.removeListener(groupListener);
deviceService.removeListener(deviceListener);
//processIntfFilters(false, configService.getInterfaces()); //TODO necessary?
log.info("BgpRouter stopped");
}
......@@ -225,17 +201,6 @@ public class BgpRouter {
Integer nextId;
synchronized (pendingUpdates) {
nextId = nextHops.get(entry.nextHopIp());
/*
group = groupService.getGroup(deviceId,
new DefaultGroupKey(
appKryo.serialize(nextHop.group())));
if (group == null) {
log.debug("Adding pending flow {}", update.entry());
pendingUpdates.put(nextHop.group(), update.entry());
continue;
}*/
}
toInstall.put(update.entry(), nextId);
......@@ -251,7 +216,7 @@ public class BgpRouter {
Integer nextId = entry.getValue();
flowObjectiveService.forward(deviceId,
generateRibFlowRule(fibEntry.prefix(), nextId).add());
generateRibForwardingObj(fibEntry.prefix(), nextId).add());
log.trace("Sending forwarding objective {} -> nextId:{}", fibEntry, nextId);
}
......@@ -263,20 +228,21 @@ public class BgpRouter {
FibEntry entry = update.entry();
Integer nextId = nextHops.get(entry.nextHopIp());
/*Group group = deleteNextHop(entry.prefix());
/* Group group = deleteNextHop(entry.prefix());
if (group == null) {
log.warn("Group not found when deleting {}", entry);
return;
}*/
flowObjectiveService.forward(deviceId,
generateRibFlowRule(entry.prefix(), nextId).remove());
generateRibForwardingObj(entry.prefix(), nextId).remove());
}
}
private ForwardingObjective.Builder generateRibFlowRule(IpPrefix prefix, Integer nextId) {
private ForwardingObjective.Builder generateRibForwardingObj(IpPrefix prefix,
Integer nextId) {
TrafficSelector selector = DefaultTrafficSelector.builder()
.matchEthType(Ethernet.TYPE_IPV4)
.matchIPDst(prefix)
......@@ -293,8 +259,6 @@ public class BgpRouter {
.withFlag(ForwardingObjective.Flag.SPECIFIC);
return fwdBuilder;
}
private synchronized void addNextHop(FibEntry entry) {
......@@ -328,24 +292,10 @@ public class BgpRouter {
.addTreatment(treatment)
.withType(NextObjective.Type.SIMPLE)
.fromApp(appId)
.add();
.add(); // TODO add callbacks
flowObjectiveService.next(deviceId, nextObjective);
/*
GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(treatment);
GroupDescription groupDescription
= new DefaultGroupDescription(deviceId,
GroupDescription.Type.INDIRECT,
new GroupBuckets(Collections
.singletonList(bucket)),
new DefaultGroupKey(appKryo.serialize(groupKey)),
appId);
groupService.addGroup(groupDescription);
*/
nextHops.put(nextHop.ip(), nextId);
}
......@@ -366,7 +316,7 @@ public class BgpRouter {
serialize(nextHop.group())));
// FIXME disabling group deletes for now until we verify the logic is OK
*//*if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
if (nextHopsCount.remove(nextHopIp, 1) <= 1) {
// There was one or less next hops, so there are now none
log.debug("removing group for next hop {}", nextHop);
......@@ -376,7 +326,7 @@ public class BgpRouter {
groupService.removeGroup(deviceId,
new DefaultGroupKey(appKryo.build().serialize(nextHop.group())),
appId);
}*//*
}
return group;
}*/
......@@ -402,31 +352,63 @@ public class BgpRouter {
.forEach(ipaddr -> fob.addCondition(
Criteria.matchIPDst(ipaddr.subnetAddress())));
fob.permit().fromApp(appId);
flowObjectiveService.filter(deviceId, fob.add());
flowObjectiveService.filter(
deviceId,
fob.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
log.info("Successfully installed interface based "
+ "filtering objcetives");
}
@Override
public void onError(Objective objective,
ObjectiveError error) {
log.error("Failed to install interface filters {}: {}",
objective, error);
// TODO something more than just logging
}
}));
}
}
/* private class InternalGroupListener implements GroupListener {
// Triggers driver setup when a device is (re)detected.
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(GroupEvent event) {
Group group = event.subject();
if (event.type() == GroupEvent.Type.GROUP_ADDED ||
event.type() == GroupEvent.Type.GROUP_UPDATED) {
synchronized (pendingUpdates) {
NextHopGroupKey nhGroupKey =
appKryo.deserialize(group.appCookie().key());
Map<FibEntry, Group> entriesToInstall =
pendingUpdates.removeAll(nhGroupKey)
.stream()
.collect(Collectors
.toMap(e -> e, e -> group));
installFlows(entriesToInstall);
}
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
if (deviceService.isAvailable(event.subject().id())) {
log.info("Device connected {}", event.subject().id());
processIntfFilters(true, configService.getInterfaces());
/* For test only - will be removed before Cardinal release */
delay(1000);
FibEntry fibEntry = new FibEntry(Ip4Prefix.valueOf("10.1.0.0/16"),
Ip4Address.valueOf("192.168.10.1"),
MacAddress.valueOf("DE:AD:BE:EF:FE:ED"));
FibUpdate fibUpdate = new FibUpdate(FibUpdate.Type.UPDATE, fibEntry);
updateFibEntry(Collections.singletonList(fibUpdate));
}
break;
// TODO other cases
case DEVICE_UPDATED:
break;
case DEVICE_REMOVED:
break;
case DEVICE_SUSPENDED:
break;
case PORT_ADDED:
break;
case PORT_UPDATED:
break;
case PORT_REMOVED:
break;
default:
break;
}
}
}*/
}
}
......
......@@ -67,7 +67,7 @@ import static org.onlab.util.Tools.groupedThreads;
@Service
public class FlowObjectiveManager implements FlowObjectiveService {
public static final int INSTALL_RETRY_ATTEMPTS = 10;
public static final int INSTALL_RETRY_ATTEMPTS = 5;
public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
private final Logger log = LoggerFactory.getLogger(getClass());
......
......@@ -270,7 +270,7 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
Criteria.IPProtocolCriterion ipProto = (Criteria.IPProtocolCriterion) selector
.getCriterion(Criterion.Type.IP_PROTO);
if (ipSrc != null) {
log.warn("Driver currently does not currently handle matching Src IP");
log.warn("Driver does not currently handle matching Src IP");
fail(fwd, ObjectiveError.UNSUPPORTED);
return Collections.emptySet();
}
......@@ -797,7 +797,8 @@ public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeli
}
pass(obj);
pendingGroups.invalidate(key);
log.info("Heard back from group service for group {}", obj.id());
log.info("Heard back from group service for group {}. "
+ "Applying pending forwarding objectives", obj.id());
flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
});
}
......