Srikanth Vavilapalli

ONOS-1823 and ONOS-1838:Segment Routing Multi-instance Support-1

Change-Id: I3cc848415a609a9c4001d135e51104c62fb2830d
Showing 18 changed files with 902 additions and 359 deletions
......@@ -89,14 +89,18 @@ public class DefaultRoutingHandler {
populationStatus = Status.STARTED;
rulePopulator.resetCounter();
log.info("Starts to populate routing rules");
log.debug("populateAllRoutingRules: populationStatus is STARTED");
for (Device sw : srManager.deviceService.getDevices()) {
if (srManager.mastershipService.getLocalRole(sw.id()) != MastershipRole.MASTER) {
log.debug("populateAllRoutingRules: skipping device {}...we are not master",
sw.id());
continue;
}
ECMPShortestPathGraph ecmpSpg = new ECMPShortestPathGraph(sw.id(), srManager);
if (!populateEcmpRoutingRules(sw.id(), ecmpSpg)) {
log.debug("populateAllRoutingRules: populationStatus is ABORTED");
populationStatus = Status.ABORTED;
log.debug("Abort routing rule population");
return false;
......@@ -106,6 +110,7 @@ public class DefaultRoutingHandler {
// TODO: Set adjacency routing rule for all switches
}
log.debug("populateAllRoutingRules: populationStatus is SUCCEEDED");
populationStatus = Status.SUCCEEDED;
log.info("Completes routing rule population. Total # of rules pushed : {}",
rulePopulator.getCounter());
......@@ -144,6 +149,8 @@ public class DefaultRoutingHandler {
log.info("Starts rule population from link change");
Set<ArrayList<DeviceId>> routeChanges;
log.trace("populateRoutingRulesForLinkStatusChange: "
+ "populationStatus is STARTED");
populationStatus = Status.STARTED;
if (linkFail == null) {
// Compare all routes of existing ECMP SPG with the new ones
......@@ -155,16 +162,19 @@ public class DefaultRoutingHandler {
if (routeChanges.isEmpty()) {
log.info("No route changes for the link status change");
log.debug("populateRoutingRulesForLinkStatusChange: populationStatus is SUCCEEDED");
populationStatus = Status.SUCCEEDED;
return true;
}
if (repopulateRoutingRulesForRoutes(routeChanges)) {
log.debug("populateRoutingRulesForLinkStatusChange: populationStatus is SUCCEEDED");
populationStatus = Status.SUCCEEDED;
log.info("Complete to repopulate the rules. # of rules populated : {}",
rulePopulator.getCounter());
return true;
} else {
log.debug("populateRoutingRulesForLinkStatusChange: populationStatus is ABORTED");
populationStatus = Status.ABORTED;
log.warn("Failed to repopulate the rules.");
return false;
......@@ -177,6 +187,7 @@ public class DefaultRoutingHandler {
for (ArrayList<DeviceId> link: routes) {
// When only the source device is defined, reinstall routes to all other devices
if (link.size() == 1) {
log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", link.get(0));
ECMPShortestPathGraph ecmpSpg = new ECMPShortestPathGraph(link.get(0), srManager);
if (populateEcmpRoutingRules(link.get(0), ecmpSpg)) {
currentEcmpSpgMap.put(link.get(0), ecmpSpg);
......@@ -187,8 +198,7 @@ public class DefaultRoutingHandler {
} else {
DeviceId src = link.get(0);
DeviceId dst = link.get(1);
log.trace("repopulateRoutingRulesForRoutes: running ECMP graph "
+ "for device {}", dst);
log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", dst);
ECMPShortestPathGraph ecmpSpg = updatedEcmpSpgMap.get(dst);
HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
ecmpSpg.getAllLearnedSwitchesAndVia();
......@@ -278,14 +288,12 @@ public class DefaultRoutingHandler {
log.debug("Checking route change for switch {}", sw.id());
ECMPShortestPathGraph ecmpSpg = currentEcmpSpgMap.get(sw.id());
if (ecmpSpg == null) {
log.debug("No existing ECMP path for Switch {}", sw.id());
log.debug("No existing ECMP graph for device {}", sw.id());
ArrayList<DeviceId> route = new ArrayList<>();
route.add(sw.id());
routes.add(route);
continue;
}
log.debug("computeRouteChange: running ECMP graph "
+ "for device {}", sw.id());
ECMPShortestPathGraph newEcmpSpg = updatedEcmpSpgMap.get(sw.id());
currentEcmpSpgMap.put(sw.id(), newEcmpSpg);
HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
......@@ -400,6 +408,8 @@ public class DefaultRoutingHandler {
// rule for both subnet and router IP.
if (config.isEdgeDevice(targetSw) && config.isEdgeDevice(destSw)) {
List<Ip4Prefix> subnets = config.getSubnets(destSw);
log.debug("populateEcmpRoutingRulePartial in device {} towards {} for subnets {}",
targetSw, destSw, subnets);
result = rulePopulator.populateIpRuleForSubnet(targetSw,
subnets,
destSw,
......@@ -410,6 +420,8 @@ public class DefaultRoutingHandler {
Ip4Address routerIp = config.getRouterIp(destSw);
IpPrefix routerIpPrefix = IpPrefix.valueOf(routerIp, IpPrefix.MAX_INET_MASK_LENGTH);
log.debug("populateEcmpRoutingRulePartial in device {} towards {} for router IP {}",
targetSw, destSw, routerIpPrefix);
result = rulePopulator.populateIpRuleForRouter(targetSw, routerIpPrefix, destSw, nextHops);
if (!result) {
return false;
......@@ -419,6 +431,8 @@ public class DefaultRoutingHandler {
} else if (config.isEdgeDevice(targetSw)) {
Ip4Address routerIp = config.getRouterIp(destSw);
IpPrefix routerIpPrefix = IpPrefix.valueOf(routerIp, IpPrefix.MAX_INET_MASK_LENGTH);
log.debug("populateEcmpRoutingRulePartial in device {} towards {} for router IP {}",
targetSw, destSw, routerIpPrefix);
result = rulePopulator.populateIpRuleForRouter(targetSw, routerIpPrefix, destSw, nextHops);
if (!result) {
return false;
......@@ -426,6 +440,8 @@ public class DefaultRoutingHandler {
}
// Populates MPLS rules to all routers
log.debug("populateEcmpRoutingRulePartial in device{} towards {} for all MPLS rules",
targetSw, destSw);
result = rulePopulator.populateMplsRule(targetSw, destSw, nextHops);
if (!result) {
return false;
......@@ -453,9 +469,13 @@ public class DefaultRoutingHandler {
public void startPopulationProcess() {
synchronized (populationStatus) {
if (populationStatus == Status.IDLE
|| populationStatus == Status.SUCCEEDED) {
|| populationStatus == Status.SUCCEEDED
|| populationStatus == Status.ABORTED) {
populationStatus = Status.STARTED;
populateAllRoutingRules();
} else {
log.warn("Not initiating startPopulationProcess as populationStatus is {}",
populationStatus);
}
}
}
......
......@@ -99,6 +99,9 @@ public class DeviceConfiguration implements DeviceProperties {
deviceConfigMap.get(deviceId).nodeSid);
return deviceConfigMap.get(deviceId).nodeSid;
} else {
log.warn("getSegmentId for device {} "
+ "throwing IllegalStateException "
+ "because device does not exist in config", deviceId);
throw new IllegalStateException();
}
}
......@@ -151,6 +154,9 @@ public class DeviceConfiguration implements DeviceProperties {
deviceConfigMap.get(deviceId).mac);
return deviceConfigMap.get(deviceId).mac;
} else {
log.warn("getDeviceMac for device {} "
+ "throwing IllegalStateException "
+ "because device does not exist in config", deviceId);
throw new IllegalStateException();
}
}
......@@ -168,6 +174,9 @@ public class DeviceConfiguration implements DeviceProperties {
deviceConfigMap.get(deviceId).ip);
return deviceConfigMap.get(deviceId).ip;
} else {
log.warn("getRouterIp for device {} "
+ "throwing IllegalStateException "
+ "because device does not exist in config", deviceId);
throw new IllegalStateException();
}
}
......@@ -187,6 +196,9 @@ public class DeviceConfiguration implements DeviceProperties {
deviceConfigMap.get(deviceId).isEdge);
return deviceConfigMap.get(deviceId).isEdge;
} else {
log.warn("isEdgeDevice for device {} "
+ "throwing IllegalStateException "
+ "because device does not exist in config", deviceId);
throw new IllegalStateException();
}
}
......
......@@ -217,6 +217,11 @@ public class RoutingRulePopulator {
// If the next hop is the destination router, do PHP
if (nextHops.size() == 1 && destSwId.equals(nextHops.toArray()[0])) {
log.debug("populateMplsRule: Installing MPLS forwarding objective for "
+ "label {} in switch {} with PHP",
config.getSegmentId(destSwId),
deviceId);
ForwardingObjective.Builder fwdObjBosBuilder =
getMplsForwardingObjective(deviceId,
destSwId,
......@@ -237,6 +242,11 @@ public class RoutingRulePopulator {
return false;
}
} else {
log.debug("Installing MPLS forwarding objective for "
+ "label {} in switch {} without PHP",
config.getSegmentId(destSwId),
deviceId);
ForwardingObjective.Builder fwdObjBosBuilder =
getMplsForwardingObjective(deviceId,
destSwId,
......@@ -264,8 +274,6 @@ public class RoutingRulePopulator {
.makePermanent()).withSelector(selector)
.withPriority(100))
.withFlag(ForwardingObjective.Flag.SPECIFIC);
log.debug("Installing MPLS forwarding objective in switch {}",
deviceId);
srManager.flowObjectiveService.forward(deviceId,
fwdObjBuilder.add());
rulePopulationCounter.incrementAndGet();
......
......@@ -22,23 +22,22 @@ import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IPv4;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.Event;
import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
import org.onosproject.segmentrouting.grouphandler.NeighborSet;
import org.onosproject.segmentrouting.grouphandler.NeighborSetNextObjectiveStoreKey;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.group.Group;
import org.onosproject.net.group.GroupEvent;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.host.HostService;
import org.onosproject.net.intent.IntentService;
......@@ -51,9 +50,16 @@ import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
import org.onosproject.net.topology.TopologyService;
import org.onosproject.segmentrouting.config.NetworkConfigManager;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.store.service.WallclockClockManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
......@@ -112,6 +118,11 @@ public class SegmentRoutingManager {
private static ScheduledFuture<?> eventHandlerFuture = null;
private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<DeviceId, DefaultGroupHandler>();
// Per device next objective ID store with (device id + neighbor set) as key
private EventuallyConsistentMap<NeighborSetNextObjectiveStoreKey,
Integer> nsNextObjStore = null;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
private NetworkConfigManager networkConfigService = new NetworkConfigManager();;
......@@ -119,10 +130,34 @@ public class SegmentRoutingManager {
private static int numOfHandlerExecution = 0;
private static int numOfHandlerScheduled = 0;
private KryoNamespace.Builder kryoBuilder = null;
@Activate
protected void activate() {
appId = coreService
.registerApplication("org.onosproject.segmentrouting");
kryoBuilder = new KryoNamespace.Builder()
.register(NeighborSetNextObjectiveStoreKey.class,
NeighborSet.class,
DeviceId.class,
URI.class,
WallClockTimestamp.class,
org.onosproject.cluster.NodeId.class,
HashSet.class
);
log.debug("Creating EC map nsnextobjectivestore");
EventuallyConsistentMapBuilder<NeighborSetNextObjectiveStoreKey, Integer>
nsNextObjMapBuilder = storageService.eventuallyConsistentMapBuilder();
nsNextObjStore = nsNextObjMapBuilder
.withName("nsnextobjectivestore")
.withSerializer(kryoBuilder)
.withClockService(new WallclockClockManager<>())
.build();
log.trace("Current size {}", nsNextObjStore.size());
networkConfigService.init();
deviceConfiguration = new DeviceConfiguration(networkConfigService);
arpHandler = new ArpHandler(this);
......@@ -136,20 +171,18 @@ public class SegmentRoutingManager {
deviceService.addListener(new InternalDeviceListener());
for (Device device : deviceService.getDevices()) {
if (mastershipService.getLocalRole(device.id()) == MastershipRole.MASTER) {
DefaultGroupHandler groupHandler = DefaultGroupHandler
.createGroupHandler(device.id(), appId,
deviceConfiguration, linkService,
flowObjectiveService);
groupHandlerMap.put(device.id(), groupHandler);
defaultRoutingHandler.populateTtpRules(device.id());
log.debug("Initiating default group handling for {}", device.id());
} else {
log.debug("Activate: Local role {} "
+ "is not MASTER for device {}",
mastershipService.getLocalRole(device.id()),
device.id());
}
//Irrespective whether the local is a MASTER or not for this device,
//create group handler instance and push default TTP flow rules.
//Because in a multi-instance setup, instances can initiate
//groups for any devices. Also the default TTP rules are needed
//to be pushed before inserting any IP table entries for any device
DefaultGroupHandler groupHandler = DefaultGroupHandler
.createGroupHandler(device.id(), appId,
deviceConfiguration, linkService,
flowObjectiveService,
nsNextObjStore);
groupHandlerMap.put(device.id(), groupHandler);
defaultRoutingHandler.populateTtpRules(device.id());
}
defaultRoutingHandler.startPopulationProcess();
......@@ -180,8 +213,14 @@ public class SegmentRoutingManager {
public int getNextObjectiveId(DeviceId deviceId, NeighborSet ns) {
return (groupHandlerMap.get(deviceId) != null) ? groupHandlerMap
.get(deviceId).getNextObjectiveId(ns) : -1;
if (groupHandlerMap.get(deviceId) != null) {
log.trace("getNextObjectiveId query in device {}", deviceId);
return groupHandlerMap
.get(deviceId).getNextObjectiveId(ns);
} else {
log.warn("getNextObjectiveId query in device {} not found", deviceId);
return -1;
}
}
private class InternalPacketProcessor implements PacketProcessor {
......@@ -224,12 +263,12 @@ public class SegmentRoutingManager {
@Override
public void event(DeviceEvent event) {
if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
/*if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
log.debug("Local role {} is not MASTER for device {}",
mastershipService.getLocalRole(event.subject().id()),
event.subject().id());
return;
}
}*/
switch (event.type()) {
case DEVICE_ADDED:
......@@ -245,12 +284,14 @@ public class SegmentRoutingManager {
private void scheduleEventHandlerIfNotScheduled(Event event) {
eventQueue.add(event);
numOfEvents++;
if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
eventHandlerFuture = executorService
.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
numOfHandlerScheduled++;
synchronized (eventQueue) {
eventQueue.add(event);
numOfEvents++;
if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
eventHandlerFuture = executorService
.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
numOfHandlerScheduled++;
}
}
log.trace("numOfEvents {}, numOfEventHanlderScheduled {}", numOfEvents,
......@@ -262,44 +303,68 @@ public class SegmentRoutingManager {
@Override
public void run() {
numOfHandlerExecution++;
while (!eventQueue.isEmpty()) {
Event event = eventQueue.poll();
if (event.type() == LinkEvent.Type.LINK_ADDED) {
processLinkAdded((Link) event.subject());
} else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
processLinkRemoved((Link) event.subject());
} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
processGroupAdded((Group) event.subject());
} else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
if (deviceService.isAvailable(((Device) event.subject()).id())) {
processDeviceAdded((Device) event.subject());
try {
synchronized (eventQueue) {
numOfHandlerExecution++;
while (!eventQueue.isEmpty()) {
Event event = eventQueue.poll();
if (event.type() == LinkEvent.Type.LINK_ADDED) {
processLinkAdded((Link) event.subject());
} else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
processLinkRemoved((Link) event.subject());
//} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
// processGroupAdded((Group) event.subject());
} else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
if (deviceService.isAvailable(((Device) event.subject()).id())) {
processDeviceAdded((Device) event.subject());
}
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
processPortRemoved((Device) event.subject(),
((DeviceEvent) event).port());
} else {
log.warn("Unhandled event type: {}", event.type());
}
}
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
processPortRemoved((Device) event.subject(),
((DeviceEvent) event).port());
} else {
log.warn("Unhandled event type: {}", event.type());
}
log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
} catch (Exception e) {
log.error("SegmentRouting event handler "
+ "thread thrown an exception: {}", e);
}
log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
}
}
private void processLinkAdded(Link link) {
log.debug("A new link {} was added", link.toString());
if (mastershipService.getLocalRole(link.src().deviceId()) == MastershipRole.MASTER) {
DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
.deviceId());
if (groupHandler != null) {
//Irrespective whether the local is a MASTER or not for this device,
//create group handler instance and push default TTP flow rules.
//Because in a multi-instance setup, instances can initiate
//groups for any devices. Also the default TTP rules are needed
//to be pushed before inserting any IP table entries for any device
DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
.deviceId());
if (groupHandler != null) {
groupHandler.linkUp(link);
} else {
Device device = deviceService.getDevice(link.src().deviceId());
if (device != null) {
log.warn("processLinkAdded: Link Added "
+ "Notification without Device Added "
+ "event, still handling it");
processDeviceAdded(device);
groupHandler = groupHandlerMap.get(link.src()
.deviceId());
groupHandler.linkUp(link);
}
}
defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
//defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
log.trace("processLinkAdded: re-starting route population process");
defaultRoutingHandler.startPopulationProcess();
}
private void processLinkRemoved(Link link) {
......@@ -308,20 +373,27 @@ public class SegmentRoutingManager {
if (groupHandler != null) {
groupHandler.portDown(link.src().port());
}
defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
}
private void processGroupAdded(Group group) {
log.debug("A new group with ID {} was added", group.id());
defaultRoutingHandler.resumePopulationProcess();
//defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
log.trace("processLinkRemoved: re-starting route population process");
defaultRoutingHandler.startPopulationProcess();
}
private void processDeviceAdded(Device device) {
log.debug("A new device with ID {} was added", device.id());
defaultRoutingHandler.populateTtpRules(device.id());
DefaultGroupHandler dgh = DefaultGroupHandler.createGroupHandler(device
.id(), appId, deviceConfiguration, linkService, flowObjectiveService);
//Irrespective whether the local is a MASTER or not for this device,
//create group handler instance and push default TTP flow rules.
//Because in a multi-instance setup, instances can initiate
//groups for any devices. Also the default TTP rules are needed
//to be pushed before inserting any IP table entries for any device
DefaultGroupHandler dgh = DefaultGroupHandler.
createGroupHandler(device.id(),
appId,
deviceConfiguration,
linkService,
flowObjectiveService,
nsNextObjStore);
groupHandlerMap.put(device.id(), dgh);
defaultRoutingHandler.populateTtpRules(device.id());
}
private void processPortRemoved(Device device, Port port) {
......
......@@ -19,16 +19,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.onlab.packet.MplsLabel;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.link.LinkService;
import org.onosproject.store.service.EventuallyConsistentMap;
/**
* Default ECMP group handler creation module for an edge device.
......@@ -53,8 +49,11 @@ public class DefaultEdgeGroupHandler extends DefaultGroupHandler {
ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService) {
super(deviceId, appId, config, linkService, flowObjService);
FlowObjectiveService flowObjService,
EventuallyConsistentMap<
NeighborSetNextObjectiveStoreKey,
Integer> nsNextObjStore) {
super(deviceId, appId, config, linkService, flowObjService, nsNextObjStore);
}
@Override
......@@ -108,7 +107,7 @@ public class DefaultEdgeGroupHandler extends DefaultGroupHandler {
@Override
protected void newPortToExistingNeighbor(Link newNeighborLink) {
log.debug("New port to existing neighbor: Updating "
/*log.debug("New port to existing neighbor: Updating "
+ "groups for edge device {}", deviceId);
addNeighborAtPort(newNeighborLink.dst().deviceId(),
newNeighborLink.src().port());
......@@ -129,7 +128,7 @@ public class DefaultEdgeGroupHandler extends DefaultGroupHandler {
mplsLabel(ns.getEdgeLabel()));
}
Integer nextId = deviceNextObjectiveIds.get(getGroupKey(ns));
Integer nextId = deviceNextObjectiveIds.get(ns);
if (nextId != null) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withId(nextId)
......@@ -140,7 +139,7 @@ public class DefaultEdgeGroupHandler extends DefaultGroupHandler {
NextObjective nextObjective = nextObjBuilder.add();
flowObjectiveService.next(deviceId, nextObjective);
}
}
}*/
}
@Override
......
......@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import org.onlab.packet.MacAddress;
import org.onlab.packet.MplsLabel;
......@@ -42,6 +43,7 @@ import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.group.DefaultGroupKey;
import org.onosproject.net.group.GroupKey;
import org.onosproject.net.link.LinkService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.slf4j.Logger;
/**
......@@ -66,8 +68,10 @@ public class DefaultGroupHandler {
new HashMap<DeviceId, Set<PortNumber>>();
protected HashMap<PortNumber, DeviceId> portDeviceMap =
new HashMap<PortNumber, DeviceId>();
protected HashMap<GroupKey, Integer> deviceNextObjectiveIds =
new HashMap<GroupKey, Integer>();
//protected HashMap<NeighborSet, Integer> deviceNextObjectiveIds =
// new HashMap<NeighborSet, Integer>();
protected EventuallyConsistentMap<
NeighborSetNextObjectiveStoreKey, Integer> nsNextObjStore = null;
protected Random rand = new Random();
protected KryoNamespace.Builder kryo = new KryoNamespace.Builder()
......@@ -81,7 +85,10 @@ public class DefaultGroupHandler {
protected DefaultGroupHandler(DeviceId deviceId, ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService) {
FlowObjectiveService flowObjService,
EventuallyConsistentMap<
NeighborSetNextObjectiveStoreKey,
Integer> nsNextObjStore) {
this.deviceId = checkNotNull(deviceId);
this.appId = checkNotNull(appId);
this.deviceConfig = checkNotNull(config);
......@@ -91,6 +98,7 @@ public class DefaultGroupHandler {
isEdgeRouter = config.isEdgeDevice(deviceId);
nodeMacAddr = checkNotNull(config.getDeviceMac(deviceId));
this.flowObjectiveService = flowObjService;
this.nsNextObjStore = nsNextObjStore;
populateNeighborMaps();
}
......@@ -111,13 +119,20 @@ public class DefaultGroupHandler {
ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService) {
FlowObjectiveService flowObjService,
EventuallyConsistentMap<
NeighborSetNextObjectiveStoreKey,
Integer> nsNextObjStore) {
if (config.isEdgeDevice(deviceId)) {
return new DefaultEdgeGroupHandler(deviceId, appId, config,
linkService, flowObjService);
linkService,
flowObjService,
nsNextObjStore);
} else {
return new DefaultTransitGroupHandler(deviceId, appId, config,
linkService, flowObjService);
linkService,
flowObjService,
nsNextObjStore);
}
}
......@@ -150,12 +165,56 @@ public class DefaultGroupHandler {
log.debug("Device {} linkUp at local port {} to neighbor {}", deviceId,
newLink.src().port(), newLink.dst().deviceId());
if (devicePortMap.get(newLink.dst().deviceId()) == null) {
addNeighborAtPort(newLink.dst().deviceId(),
newLink.src().port());
/*if (devicePortMap.get(newLink.dst().deviceId()) == null) {
// New Neighbor
newNeighbor(newLink);
} else {
// Old Neighbor
newPortToExistingNeighbor(newLink);
}*/
Set<NeighborSet> nsSet = nsNextObjStore.keySet()
.stream()
.filter((nsStoreEntry) -> (nsStoreEntry.deviceId().equals(deviceId)))
.map((nsStoreEntry) -> (nsStoreEntry.neighborSet()))
.filter((ns) -> (ns.getDeviceIds()
.contains(newLink.dst().deviceId())))
.collect(Collectors.toSet());
log.trace("linkUp: nsNextObjStore contents for device {}:",
deviceId,
nsSet);
for (NeighborSet ns : nsSet) {
// Create the new bucket to be updated
TrafficTreatment.Builder tBuilder =
DefaultTrafficTreatment.builder();
tBuilder.setOutput(newLink.src().port())
.setEthDst(deviceConfig.getDeviceMac(
newLink.dst().deviceId()))
.setEthSrc(nodeMacAddr);
if (ns.getEdgeLabel() != NeighborSet.NO_EDGE_LABEL) {
tBuilder.pushMpls()
.setMpls(MplsLabel.
mplsLabel(ns.getEdgeLabel()));
}
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withId(nextId)
.withType(NextObjective.Type.HASHED).fromApp(appId);
nextObjBuilder.addTreatment(tBuilder.build());
log.debug("linkUp in device {}: Adding Bucket "
+ "with Port {} to next object id {}",
deviceId,
newLink.src().port(),
nextId);
NextObjective nextObjective = nextObjBuilder.add();
flowObjectiveService.next(deviceId, nextObjective);
}
}
}
......@@ -171,10 +230,20 @@ public class DefaultGroupHandler {
}
log.debug("Device {} portDown {} to neighbor {}", deviceId, port,
portDeviceMap.get(port));
Set<NeighborSet> nsSet = computeImpactedNeighborsetForPortEvent(portDeviceMap
/*Set<NeighborSet> nsSet = computeImpactedNeighborsetForPortEvent(portDeviceMap
.get(port),
devicePortMap
.keySet());
.keySet());*/
Set<NeighborSet> nsSet = nsNextObjStore.keySet()
.stream()
.filter((nsStoreEntry) -> (nsStoreEntry.deviceId().equals(deviceId)))
.map((nsStoreEntry) -> (nsStoreEntry.neighborSet()))
.filter((ns) -> (ns.getDeviceIds()
.contains(portDeviceMap.get(port))))
.collect(Collectors.toSet());
log.trace("portDown: nsNextObjStore contents for device {}:",
deviceId,
nsSet);
for (NeighborSet ns : nsSet) {
// Create the bucket to be removed
TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment
......@@ -187,13 +256,19 @@ public class DefaultGroupHandler {
.getEdgeLabel()));
}
Integer nextId = deviceNextObjectiveIds.get(getGroupKey(ns));
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId != null) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withType(NextObjective.Type.SIMPLE).withId(nextId).fromApp(appId);
nextObjBuilder.addTreatment(tBuilder.build());
log.debug("portDown in device {}: Removing Bucket "
+ "with Port {} to next object id {}",
deviceId,
port,
nextId);
NextObjective nextObjective = nextObjBuilder.remove();
flowObjectiveService.next(deviceId, nextObjective);
......@@ -214,14 +289,31 @@ public class DefaultGroupHandler {
* @return int if found or -1
*/
public int getNextObjectiveId(NeighborSet ns) {
Integer nextId = deviceNextObjectiveIds.get(getGroupKey(ns));
Integer nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId == null) {
log.trace("getNextObjectiveId in device{}: Next objective id "
+ "not found for {} and creating", deviceId, ns);
log.trace("getNextObjectiveId: nsNextObjStore contents for device {}: {}",
deviceId,
nsNextObjStore.entrySet()
.stream()
.filter((nsStoreEntry) ->
(nsStoreEntry.getKey().deviceId().equals(deviceId)))
.collect(Collectors.toList()));
createGroupsFromNeighborsets(Collections.singleton(ns));
nextId = deviceNextObjectiveIds.get(getGroupKey(ns));
nextId = nsNextObjStore.
get(new NeighborSetNextObjectiveStoreKey(deviceId, ns));
if (nextId == null) {
log.warn("getNextObjectiveId: unable to create next objective");
return -1;
} else {
log.debug("getNextObjectiveId in device{}: Next objective id {} "
+ "created for {}", deviceId, nextId.intValue(), ns);
}
} else {
log.trace("getNextObjectiveId in device{}: Next objective id {} "
+ "found for {}", deviceId, nextId.intValue(), ns);
}
return nextId.intValue();
}
......@@ -338,6 +430,10 @@ public class DefaultGroupHandler {
if (devicePortMap.get(d) == null) {
log.warn("Device {} is not in the port map yet", d);
return;
} else if (devicePortMap.get(d).size() == 0) {
log.warn("There are no ports for "
+ "the Device {} in the port map yet", d);
return;
}
for (PortNumber sp : devicePortMap.get(d)) {
......@@ -356,7 +452,11 @@ public class DefaultGroupHandler {
NextObjective nextObj = nextObjBuilder.add();
flowObjectiveService.next(deviceId, nextObj);
deviceNextObjectiveIds.put(getGroupKey(ns), nextId);
log.debug("createGroupsFromNeighborsets: Submited "
+ "next objective {} in device {}",
nextId, deviceId);
nsNextObjStore.put(new NeighborSetNextObjectiveStoreKey(deviceId, ns),
nextId);
}
}
......
......@@ -18,16 +18,12 @@ package org.onosproject.segmentrouting.grouphandler;
import java.util.HashSet;
import java.util.Set;
import org.onlab.packet.MplsLabel;
import org.onosproject.core.ApplicationId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flowobjective.DefaultNextObjective;
import org.onosproject.net.flowobjective.FlowObjectiveService;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.link.LinkService;
import org.onosproject.store.service.EventuallyConsistentMap;
/**
* Default ECMP group handler creation module for a transit device.
......@@ -47,8 +43,11 @@ public class DefaultTransitGroupHandler extends DefaultGroupHandler {
ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService) {
super(deviceId, appId, config, linkService, flowObjService);
FlowObjectiveService flowObjService,
EventuallyConsistentMap<
NeighborSetNextObjectiveStoreKey,
Integer> nsNextObjStore) {
super(deviceId, appId, config, linkService, flowObjService, nsNextObjStore);
}
@Override
......@@ -96,7 +95,7 @@ public class DefaultTransitGroupHandler extends DefaultGroupHandler {
@Override
protected void newPortToExistingNeighbor(Link newNeighborLink) {
log.debug("New port to existing neighbor: Updating "
/*log.debug("New port to existing neighbor: Updating "
+ "groups for transit device {}", deviceId);
addNeighborAtPort(newNeighborLink.dst().deviceId(),
newNeighborLink.src().port());
......@@ -118,7 +117,7 @@ public class DefaultTransitGroupHandler extends DefaultGroupHandler {
}
Integer nextId = deviceNextObjectiveIds.get(getGroupKey(ns));
Integer nextId = deviceNextObjectiveIds.get(ns);
if (nextId != null) {
NextObjective.Builder nextObjBuilder = DefaultNextObjective
.builder().withId(nextId)
......@@ -129,7 +128,7 @@ public class DefaultTransitGroupHandler extends DefaultGroupHandler {
NextObjective nextObjective = nextObjBuilder.add();
flowObjectiveService.next(deviceId, nextObjective);
}
}
}*/
}
@Override
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.segmentrouting.grouphandler;
import java.util.Objects;
import org.onosproject.net.DeviceId;
/**
* Class definition of Key for Neighborset to NextObjective store.
*/
public class NeighborSetNextObjectiveStoreKey {
private final DeviceId deviceId;
private final NeighborSet ns;
public NeighborSetNextObjectiveStoreKey(DeviceId deviceId,
NeighborSet ns) {
this.deviceId = deviceId;
this.ns = ns;
}
public DeviceId deviceId() {
return this.deviceId;
}
public NeighborSet neighborSet() {
return this.ns;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof NeighborSetNextObjectiveStoreKey)) {
return false;
}
NeighborSetNextObjectiveStoreKey that =
(NeighborSetNextObjectiveStoreKey) o;
return (Objects.equals(this.deviceId, that.deviceId) &&
Objects.equals(this.ns, that.ns));
}
// The list of neighbor ids and label are used for comparison.
@Override
public int hashCode() {
int result = 17;
result = 31 * result + Objects.hashCode(this.deviceId)
+ Objects.hashCode(this.ns);
return result;
}
@Override
public String toString() {
return "Device: " + deviceId + " Neighborset: " + ns;
}
}
......@@ -27,6 +27,7 @@ import java.util.List;
import org.onlab.packet.MplsLabel;
import org.onosproject.core.ApplicationId;
import org.onosproject.segmentrouting.grouphandler.GroupBucketIdentifier.BucketOutputType;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.DefaultTrafficTreatment;
......@@ -58,8 +59,11 @@ public class PolicyGroupHandler extends DefaultGroupHandler {
ApplicationId appId,
DeviceProperties config,
LinkService linkService,
FlowObjectiveService flowObjService) {
super(deviceId, appId, config, linkService, flowObjService);
FlowObjectiveService flowObjService,
EventuallyConsistentMap<
NeighborSetNextObjectiveStoreKey,
Integer> nsNextObjStore) {
super(deviceId, appId, config, linkService, flowObjService, nsNextObjStore);
}
public PolicyGroupIdentifier createPolicyGroupChain(String id,
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.net.group;
import java.util.Collection;
import org.onosproject.core.GroupId;
import org.onosproject.net.DeviceId;
import org.onosproject.store.Store;
......@@ -162,4 +164,12 @@ public interface GroupStore extends Store<GroupEvent, GroupStoreDelegate> {
* @param operation the group operation failed
*/
void groupOperationFailed(DeviceId deviceId, GroupOperation operation);
/**
* Submits the group metrics to store for a given device ID.
*
* @param deviceId the device ID
* @param groupEntries the group entries as received from southbound
*/
void pushGroupMetrics(DeviceId deviceId, Collection<Group> groupEntries);
}
......
......@@ -15,7 +15,11 @@
*/
package org.onosproject.net.group.impl;
import com.google.common.collect.Sets;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Collections;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -48,13 +52,6 @@ import org.onosproject.net.provider.AbstractProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.slf4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Provides implementation of the group service APIs.
*/
......@@ -316,131 +313,13 @@ public class GroupManager
store.groupOperationFailed(deviceId, operation);
}
private void groupMissing(Group group) {
checkValidity();
GroupProvider gp = getProvider(group.deviceId());
switch (group.state()) {
case PENDING_DELETE:
log.debug("Group {} delete confirmation from device {}",
group, group.deviceId());
store.removeGroupEntry(group);
break;
case ADDED:
case PENDING_ADD:
log.debug("Group {} is in store but not on device {}",
group, group.deviceId());
GroupOperation groupAddOp = GroupOperation.
createAddGroupOperation(group.id(),
group.type(),
group.buckets());
GroupOperations groupOps = new GroupOperations(
Collections.singletonList(groupAddOp));
gp.performGroupOperation(group.deviceId(), groupOps);
break;
default:
log.debug("Group {} has not been installed.", group);
break;
}
}
private void extraneousGroup(Group group) {
log.debug("Group {} is on device {} but not in store.",
group, group.deviceId());
checkValidity();
store.addOrUpdateExtraneousGroupEntry(group);
}
private void groupAdded(Group group) {
checkValidity();
log.trace("Group {} Added or Updated in device {}",
group, group.deviceId());
store.addOrUpdateGroupEntry(group);
}
@Override
public void pushGroupMetrics(DeviceId deviceId,
Collection<Group> groupEntries) {
log.trace("Received group metrics from device {}",
deviceId);
boolean deviceInitialAuditStatus =
store.deviceInitialAuditStatus(deviceId);
Set<Group> southboundGroupEntries =
Sets.newHashSet(groupEntries);
Set<Group> storedGroupEntries =
Sets.newHashSet(store.getGroups(deviceId));
Set<Group> extraneousStoredEntries =
Sets.newHashSet(store.getExtraneousGroups(deviceId));
log.trace("Displaying all ({}) southboundGroupEntries for device {}",
southboundGroupEntries.size(),
deviceId);
for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
log.trace("Group {} in device {}", group, deviceId);
}
log.trace("Displaying all ({}) stored group entries for device {}",
storedGroupEntries.size(),
deviceId);
for (Iterator<Group> it1 = storedGroupEntries.iterator(); it1.hasNext();) {
Group group = it1.next();
log.trace("Stored Group {} for device {}", group, deviceId);
}
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists "
+ "in both planes for device {}",
group.id(), deviceId);
groupAdded(group);
it2.remove();
}
}
for (Group group : southboundGroupEntries) {
if (store.getGroup(group.deviceId(), group.id()) != null) {
// There is a group existing with the same id
// It is possible that group update is
// in progress while we got a stale info from switch
if (!storedGroupEntries.remove(store.getGroup(
group.deviceId(), group.id()))) {
log.warn("Group AUDIT: Inconsistent state:"
+ "Group exists in ID based table while "
+ "not present in key based table");
}
} else {
// there are groups in the switch that aren't in the store
log.trace("Group AUDIT: extraneous group {} exists "
+ "in data plane for device {}",
group.id(), deviceId);
extraneousStoredEntries.remove(group);
extraneousGroup(group);
}
}
for (Group group : storedGroupEntries) {
// there are groups in the store that aren't in the switch
log.trace("Group AUDIT: group {} missing "
+ "in data plane for device {}",
group.id(), deviceId);
groupMissing(group);
}
for (Group group : extraneousStoredEntries) {
// there are groups in the extraneous store that
// aren't in the switch
log.trace("Group AUDIT: clearing extransoeus group {} "
+ "from store for device {}",
group.id(), deviceId);
store.removeExtraneousGroupEntry(group);
}
if (!deviceInitialAuditStatus) {
log.debug("Group AUDIT: Setting device {} initial "
+ "AUDIT completed", deviceId);
store.deviceInitialAuditCompleted(deviceId, true);
}
checkValidity();
store.pushGroupMetrics(deviceId, groupEntries);
}
}
......@@ -450,10 +329,16 @@ public class GroupManager
public void event(DeviceEvent event) {
switch (event.type()) {
case DEVICE_REMOVED:
log.debug("Clearing device {} initial "
+ "AUDIT completed status as device is going down",
event.subject().id());
store.deviceInitialAuditCompleted(event.subject().id(), false);
case DEVICE_AVAILABILITY_CHANGED:
if (!deviceService.isAvailable(event.subject().id())) {
log.debug("GroupService DeviceListener: Received event {}."
+ "Device is no more available."
+ "Clearing device {} initial "
+ "AUDIT completed status",
event.type(),
event.subject().id());
store.deviceInitialAuditCompleted(event.subject().id(), false);
}
break;
default:
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.group.impl;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -63,7 +64,9 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.serializers.DeviceIdSerializer;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.URISerializer;
import org.onosproject.store.service.ClockService;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
......@@ -74,10 +77,13 @@ import org.slf4j.Logger;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
......@@ -156,8 +162,8 @@ public class DistributedGroupStore
GroupStoreIdMapKey.class,
GroupStoreMapKey.class
)
.register(URI.class)
.register(DeviceId.class)
.register(new URISerializer(), URI.class)
.register(new DeviceIdSerializer(), DeviceId.class)
.register(PortNumber.class)
.register(DefaultApplicationId.class)
.register(DefaultTrafficTreatment.class,
......@@ -207,7 +213,8 @@ public class DistributedGroupStore
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
groupStoreEntriesByKey.addListener(new GroupStoreKeyMapListener());
log.trace("Current size {}", groupStoreEntriesByKey.size());
log.debug("Current size of groupstorekeymap:{}",
groupStoreEntriesByKey.size());
log.debug("Creating EC map pendinggroupkeymap");
EventuallyConsistentMapBuilder<GroupStoreKeyMapKey, StoredGroupEntry>
......@@ -218,7 +225,8 @@ public class DistributedGroupStore
.withSerializer(kryoBuilder)
.withClockService(new GroupStoreLogicalClockManager<>())
.build();
log.trace("Current size {}", auditPendingReqQueue.size());
log.debug("Current size of pendinggroupkeymap:{}",
auditPendingReqQueue.size());
log.info("Started");
}
......@@ -305,13 +313,21 @@ public class DistributedGroupStore
@Override
public Iterable<Group> getGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
log.trace("getGroups: for device {} total number of groups {}",
log.debug("getGroups: for device {} total number of groups {}",
deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId))
.transform(input -> input);
}
private Iterable<StoredGroupEntry> getStoredGroups(DeviceId deviceId) {
// flatten and make iterator unmodifiable
log.debug("getGroups: for device {} total number of groups {}",
deviceId, getGroupStoreKeyMap().values().size());
return FluentIterable.from(getGroupStoreKeyMap().values())
.filter(input -> input.deviceId().equals(deviceId));
}
/**
* Returns the stored group entry.
*
......@@ -359,6 +375,7 @@ public class DistributedGroupStore
break;
}
}
log.debug("getFreeGroupIdValue: Next Free ID is {}", freeId);
return freeId;
}
......@@ -369,7 +386,7 @@ public class DistributedGroupStore
*/
@Override
public void storeGroupDescription(GroupDescription groupDesc) {
log.trace("In storeGroupDescription");
log.debug("In storeGroupDescription");
// Check if a group is existing with the same key
if (getGroup(groupDesc.deviceId(), groupDesc.appCookie()) != null) {
log.warn("Group already exists with the same key {}",
......@@ -380,8 +397,15 @@ public class DistributedGroupStore
// Check if group to be created by a remote instance
if (mastershipService.getLocalRole(
groupDesc.deviceId()) != MastershipRole.MASTER) {
log.debug("Device {} local role is not MASTER",
log.debug("storeGroupDescription: Device {} local role is not MASTER",
groupDesc.deviceId());
if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
log.error("No Master for device {}..."
+ "Can not perform add group operation",
groupDesc.deviceId());
//TODO: Send Group operation failure event
return;
}
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
......@@ -394,9 +418,9 @@ public class DistributedGroupStore
groupOp,
mastershipService.getMasterFor(groupDesc.deviceId()));
//TODO: Send Group operation failure event
return;
}
log.debug("Sent Group operation request for device {} "
+ "to remote MASTER {}",
log.debug("Sent Group operation request for device {} to remote MASTER {}",
groupDesc.deviceId(),
mastershipService.getMasterFor(groupDesc.deviceId()));
return;
......@@ -417,8 +441,7 @@ public class DistributedGroupStore
// Device group audit has not completed yet
// Add this group description to pending group key table
// Create a group entry object with Dummy Group ID
log.debug("storeGroupDescriptionInternal: Device {} AUDIT "
+ "pending...Queuing Group ADD request",
log.debug("storeGroupDescriptionInternal: Device {} AUDIT pending...Queuing Group ADD request",
groupDesc.deviceId());
StoredGroupEntry group = new DefaultGroup(dummyGroupId, groupDesc);
group.setState(GroupState.WAITING_AUDIT_COMPLETE);
......@@ -447,6 +470,9 @@ public class DistributedGroupStore
// avoid any chances of duplication in group id generation
getGroupIdTable(groupDesc.deviceId()).
put(id, group);
log.debug("storeGroupDescriptionInternal: Processing Group ADD request for Id {} in device {}",
id,
groupDesc.deviceId());
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
}
......@@ -470,6 +496,15 @@ public class DistributedGroupStore
// Check if group update to be done by a remote instance
if (mastershipService.getMasterFor(deviceId) != null &&
mastershipService.getLocalRole(deviceId) != MastershipRole.MASTER) {
log.debug("updateGroupDescription: Device {} local role is not MASTER",
deviceId);
if (mastershipService.getMasterFor(deviceId) == null) {
log.error("No Master for device {}..."
+ "Can not perform update group operation",
deviceId);
//TODO: Send Group operation failure event
return;
}
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupUpdateRequestMsg(deviceId,
oldAppCookie,
......@@ -488,6 +523,8 @@ public class DistributedGroupStore
}
return;
}
log.debug("updateGroupDescription for device {} is getting handled locally",
deviceId);
updateGroupDescriptionInternal(deviceId,
oldAppCookie,
type,
......@@ -503,6 +540,7 @@ public class DistributedGroupStore
// Check if a group is existing with the provided key
Group oldGroup = getGroup(deviceId, oldAppCookie);
if (oldGroup == null) {
log.warn("updateGroupDescriptionInternal: Group not found...strange");
return;
}
......@@ -522,6 +560,10 @@ public class DistributedGroupStore
oldGroup.appId());
StoredGroupEntry newGroup = new DefaultGroup(oldGroup.id(),
updatedGroupDesc);
log.debug("updateGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_UPDATE",
oldGroup.id(),
oldGroup.deviceId(),
oldGroup.state());
newGroup.setState(GroupState.PENDING_UPDATE);
newGroup.setLife(oldGroup.life());
newGroup.setPackets(oldGroup.packets());
......@@ -529,10 +571,15 @@ public class DistributedGroupStore
//Update the group entry in groupkey based map.
//Update to groupid based map will happen in the
//groupkey based map update listener
log.debug("updateGroupDescriptionInternal with type {}: Group updated with buckets",
type);
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(newGroup.deviceId(),
newGroup.appCookie()), newGroup);
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_REQUESTED, newGroup));
} else {
log.warn("updateGroupDescriptionInternal with type {}: No "
+ "change in the buckets in update", type);
}
}
......@@ -583,6 +630,15 @@ public class DistributedGroupStore
// Check if group to be deleted by a remote instance
if (mastershipService.
getLocalRole(deviceId) != MastershipRole.MASTER) {
log.debug("deleteGroupDescription: Device {} local role is not MASTER",
deviceId);
if (mastershipService.getMasterFor(deviceId) == null) {
log.error("No Master for device {}..."
+ "Can not perform delete group operation",
deviceId);
//TODO: Send Group operation failure event
return;
}
GroupStoreMessage groupOp = GroupStoreMessage.
createGroupDeleteRequestMsg(deviceId,
appCookie);
......@@ -598,6 +654,8 @@ public class DistributedGroupStore
}
return;
}
log.debug("deleteGroupDescription in device {} is getting handled locally",
deviceId);
deleteGroupDescriptionInternal(deviceId, appCookie);
}
......@@ -609,9 +667,15 @@ public class DistributedGroupStore
return;
}
log.debug("deleteGroupDescriptionInternal: group entry {} in device {} moving from {} to PENDING_DELETE",
existing.id(),
existing.deviceId(),
existing.state());
synchronized (existing) {
existing.setState(GroupState.PENDING_DELETE);
}
log.debug("deleteGroupDescriptionInternal: in device {} issuing GROUP_REMOVE_REQUESTED",
deviceId);
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, existing));
}
......@@ -628,8 +692,7 @@ public class DistributedGroupStore
GroupEvent event = null;
if (existing != null) {
log.trace("addOrUpdateGroupEntry: updating group "
+ "entry {} in device {}",
log.debug("addOrUpdateGroupEntry: updating group entry {} in device {}",
group.id(),
group.deviceId());
synchronized (existing) {
......@@ -653,10 +716,18 @@ public class DistributedGroupStore
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
if (existing.state() == GroupState.PENDING_ADD) {
log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
existing.id(),
existing.deviceId(),
GroupState.PENDING_ADD);
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
} else {
log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
existing.id(),
existing.deviceId(),
GroupState.PENDING_UPDATE);
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(false);
event = new GroupEvent(Type.GROUP_UPDATED, existing);
......@@ -687,8 +758,7 @@ public class DistributedGroupStore
group.id());
if (existing != null) {
log.trace("removeGroupEntry: removing group "
+ "entry {} in device {}",
log.debug("removeGroupEntry: removing group entry {} in device {}",
group.id(),
group.deviceId());
//Removal from groupid based map will happen in the
......@@ -696,6 +766,11 @@ public class DistributedGroupStore
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
notifyDelegate(new GroupEvent(Type.GROUP_REMOVED, existing));
} else {
log.warn("removeGroupEntry for {} in device{} is "
+ "not existing in our maps",
group.id(),
group.deviceId());
}
}
......@@ -704,8 +779,8 @@ public class DistributedGroupStore
boolean completed) {
synchronized (deviceAuditStatus) {
if (completed) {
log.debug("deviceInitialAuditCompleted: AUDIT "
+ "completed for device {}", deviceId);
log.debug("AUDIT completed for device {}",
deviceId);
deviceAuditStatus.put(deviceId, true);
// Execute all pending group requests
List<StoredGroupEntry> pendingGroupRequests =
......@@ -713,9 +788,7 @@ public class DistributedGroupStore
.stream()
.filter(g-> g.deviceId().equals(deviceId))
.collect(Collectors.toList());
log.trace("deviceInitialAuditCompleted: processing "
+ "pending group add requests for device {} and "
+ "number of pending requests {}",
log.debug("processing pending group add requests for device {} and number of pending requests {}",
deviceId,
pendingGroupRequests.size());
for (Group group:pendingGroupRequests) {
......@@ -733,8 +806,7 @@ public class DistributedGroupStore
} else {
Boolean audited = deviceAuditStatus.get(deviceId);
if (audited != null && audited) {
log.debug("deviceInitialAuditCompleted: Clearing AUDIT "
+ "status for device {}", deviceId);
log.debug("Clearing AUDIT status for device {}", deviceId);
deviceAuditStatus.put(deviceId, false);
}
}
......@@ -760,9 +832,22 @@ public class DistributedGroupStore
return;
}
log.warn("groupOperationFailed: group operation {} failed"
+ "for group {} in device {}",
operation.opType(),
existing.id(),
existing.deviceId());
switch (operation.opType()) {
case ADD:
notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
log.warn("groupOperationFailed: cleaningup "
+ "group {} from store in device {}....",
existing.id(),
existing.deviceId());
//Removal from groupid based map will happen in the
//map update listener
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
break;
case MODIFY:
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
......@@ -773,17 +858,11 @@ public class DistributedGroupStore
default:
log.warn("Unknown group operation type {}", operation.opType());
}
//Removal from groupid based map will happen in the
//map update listener
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
}
@Override
public void addOrUpdateExtraneousGroupEntry(Group group) {
log.trace("addOrUpdateExtraneousGroupEntry: add/update extraneous "
+ "group entry {} in device {}",
log.debug("add/update extraneous group entry {} in device {}",
group.id(),
group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
......@@ -791,8 +870,7 @@ public class DistributedGroupStore
extraneousIdTable.put(group.id(), group);
// Check the reference counter
if (group.referenceCount() == 0) {
log.trace("addOrUpdateExtraneousGroupEntry: Flow reference "
+ "counter is zero and triggering remove",
log.debug("Flow reference counter is zero and triggering remove",
group.id(),
group.deviceId());
notifyDelegate(new GroupEvent(Type.GROUP_REMOVE_REQUESTED, group));
......@@ -801,8 +879,7 @@ public class DistributedGroupStore
@Override
public void removeExtraneousGroupEntry(Group group) {
log.trace("removeExtraneousGroupEntry: remove extraneous "
+ "group entry {} of device {} from store",
log.debug("remove extraneous group entry {} of device {} from store",
group.id(),
group.deviceId());
ConcurrentMap<GroupId, Group> extraneousIdTable =
......@@ -842,29 +919,47 @@ public class DistributedGroupStore
public void event(EventuallyConsistentMapEvent<GroupStoreKeyMapKey,
StoredGroupEntry> mapEvent) {
GroupEvent groupEvent = null;
GroupStoreKeyMapKey key = mapEvent.key();
StoredGroupEntry group = mapEvent.value();
log.trace("GroupStoreKeyMapListener: received groupid map event {}",
mapEvent.type());
if ((key == null) && (group == null)) {
log.error("GroupStoreKeyMapListener: Received "
+ "event {} with null entry", mapEvent.type());
return;
} else if (group == null) {
group = getGroupIdTable(key.deviceId()).values()
.stream()
.filter((storedGroup) -> (storedGroup.appCookie().equals(key.appCookie)))
.findFirst().get();
if (group == null) {
log.error("GroupStoreKeyMapListener: Received "
+ "event {} with null entry... can not process", mapEvent.type());
return;
}
}
log.trace("received groupid map event {} for id {} in device {}",
mapEvent.type(),
group.id(),
key.deviceId());
if (mapEvent.type() == EventuallyConsistentMapEvent.Type.PUT) {
log.trace("GroupStoreKeyMapListener: Received PUT event");
// Update the group ID table
getGroupIdTable(group.deviceId()).put(group.id(), group);
if (mapEvent.value().state() == Group.GroupState.ADDED) {
if (mapEvent.value().isGroupStateAddedFirstTime()) {
groupEvent = new GroupEvent(Type.GROUP_ADDED,
mapEvent.value());
log.trace("GroupStoreKeyMapListener: Received first time "
+ "GROUP_ADDED state update");
log.trace("Received first time GROUP_ADDED state update for id {} in device {}",
group.id(),
group.deviceId());
} else {
groupEvent = new GroupEvent(Type.GROUP_UPDATED,
mapEvent.value());
log.trace("GroupStoreKeyMapListener: Received following "
+ "GROUP_ADDED state update");
log.trace("Received following GROUP_ADDED state update for id {} in device {}",
group.id(),
group.deviceId());
}
}
} else if (mapEvent.type() == EventuallyConsistentMapEvent.Type.REMOVE) {
log.trace("GroupStoreKeyMapListener: Received REMOVE event");
groupEvent = new GroupEvent(Type.GROUP_REMOVED, mapEvent.value());
groupEvent = new GroupEvent(Type.GROUP_REMOVED, group);
// Remove the entry from the group ID table
getGroupIdTable(group.deviceId()).remove(group.id(), group);
}
......@@ -882,37 +977,35 @@ public class DistributedGroupStore
implements ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.trace("ClusterGroupMsgHandler: received remote group message");
if (message.subject() ==
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST) {
if (message.subject().equals(
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST)) {
GroupStoreMessage groupOp = kryoBuilder.
build().deserialize(message.payload());
log.trace("received remote group operation request");
if (!(mastershipService.
log.debug("received remote group operation {} request for device {}",
groupOp.type(),
groupOp.deviceId());
if (mastershipService.
getLocalRole(groupOp.deviceId()) !=
MastershipRole.MASTER)) {
MastershipRole.MASTER) {
log.warn("ClusterGroupMsgHandler: This node is not "
+ "MASTER for device {}", groupOp.deviceId());
return;
}
if (groupOp.type() == GroupStoreMessage.Type.ADD) {
log.trace("processing remote group "
+ "add operation request");
storeGroupDescriptionInternal(groupOp.groupDesc());
} else if (groupOp.type() == GroupStoreMessage.Type.UPDATE) {
log.trace("processing remote group "
+ "update operation request");
updateGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie(),
groupOp.updateType(),
groupOp.updateBuckets(),
groupOp.newAppCookie());
} else if (groupOp.type() == GroupStoreMessage.Type.DELETE) {
log.trace("processing remote group "
+ "delete operation request");
deleteGroupDescriptionInternal(groupOp.deviceId(),
groupOp.appCookie());
}
} else {
log.warn("ClusterGroupMsgHandler: Unknown remote message type {}",
message.subject());
}
}
}
......@@ -927,6 +1020,10 @@ public class DistributedGroupStore
this.deviceId = deviceId;
}
public DeviceId deviceId() {
return deviceId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
......@@ -1010,4 +1107,127 @@ public class DistributedGroupStore
return result;
}
}
@Override
public void pushGroupMetrics(DeviceId deviceId,
Collection<Group> groupEntries) {
boolean deviceInitialAuditStatus =
deviceInitialAuditStatus(deviceId);
Set<Group> southboundGroupEntries =
Sets.newHashSet(groupEntries);
Set<StoredGroupEntry> storedGroupEntries =
Sets.newHashSet(getStoredGroups(deviceId));
Set<Group> extraneousStoredEntries =
Sets.newHashSet(getExtraneousGroups(deviceId));
log.trace("pushGroupMetrics: Displaying all ({}) southboundGroupEntries for device {}",
southboundGroupEntries.size(),
deviceId);
for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
log.trace("Group {} in device {}", group, deviceId);
}
log.trace("Displaying all ({}) stored group entries for device {}",
storedGroupEntries.size(),
deviceId);
for (Iterator<StoredGroupEntry> it1 = storedGroupEntries.iterator();
it1.hasNext();) {
Group group = it1.next();
log.trace("Stored Group {} for device {}", group, deviceId);
}
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists in both planes for device {}",
group.id(), deviceId);
groupAdded(group);
it2.remove();
}
}
for (Group group : southboundGroupEntries) {
if (getGroup(group.deviceId(), group.id()) != null) {
// There is a group existing with the same id
// It is possible that group update is
// in progress while we got a stale info from switch
if (!storedGroupEntries.remove(getGroup(
group.deviceId(), group.id()))) {
log.warn("Group AUDIT: Inconsistent state:"
+ "Group exists in ID based table while "
+ "not present in key based table");
}
} else {
// there are groups in the switch that aren't in the store
log.debug("Group AUDIT: extraneous group {} exists in data plane for device {}",
group.id(), deviceId);
extraneousStoredEntries.remove(group);
extraneousGroup(group);
}
}
for (Group group : storedGroupEntries) {
// there are groups in the store that aren't in the switch
log.debug("Group AUDIT: group {} missing in data plane for device {}",
group.id(), deviceId);
groupMissing(group);
}
for (Group group : extraneousStoredEntries) {
// there are groups in the extraneous store that
// aren't in the switch
log.debug("Group AUDIT: clearing extransoeus group {} from store for device {}",
group.id(), deviceId);
removeExtraneousGroupEntry(group);
}
if (!deviceInitialAuditStatus) {
log.debug("Group AUDIT: Setting device {} initial AUDIT completed",
deviceId);
deviceInitialAuditCompleted(deviceId, true);
}
}
private void groupMissing(Group group) {
switch (group.state()) {
case PENDING_DELETE:
log.debug("Group {} delete confirmation from device {}",
group, group.deviceId());
removeGroupEntry(group);
break;
case ADDED:
case PENDING_ADD:
case PENDING_UPDATE:
log.debug("Group {} is in store but not on device {}",
group, group.deviceId());
StoredGroupEntry existing =
getStoredGroupEntry(group.deviceId(), group.id());
log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD",
existing.id(),
existing.deviceId(),
existing.state());
existing.setState(Group.GroupState.PENDING_ADD);
//Re-PUT map entries to trigger map update events
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()), existing);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
break;
default:
log.debug("Group {} has not been installed.", group);
break;
}
}
private void extraneousGroup(Group group) {
log.debug("Group {} is on device {} but not in store.",
group, group.deviceId());
addOrUpdateExtraneousGroupEntry(group);
}
private void groupAdded(Group group) {
log.trace("Group {} Added or Updated in device {}",
group, group.deviceId());
addOrUpdateGroupEntry(group);
}
}
......
......@@ -315,6 +315,7 @@ public final class KryoNamespaces {
Instructions.DropInstruction.class,
Instructions.OutputInstruction.class,
Instructions.GroupInstruction.class,
Instructions.TableTypeTransition.class,
L0ModificationInstruction.class,
L0ModificationInstruction.L0SubType.class,
L0ModificationInstruction.ModLambdaInstruction.class,
......
......@@ -19,9 +19,12 @@ import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsent
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -54,6 +57,7 @@ import org.slf4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Sets;
/**
* Manages inventory of group entries using trivial in-memory implementation.
......@@ -583,4 +587,131 @@ public class SimpleGroupStore
getExtraneousGroupIdTable(deviceId).values());
}
@Override
public void pushGroupMetrics(DeviceId deviceId,
Collection<Group> groupEntries) {
boolean deviceInitialAuditStatus =
deviceInitialAuditStatus(deviceId);
Set<Group> southboundGroupEntries =
Sets.newHashSet(groupEntries);
Set<Group> storedGroupEntries =
Sets.newHashSet(getGroups(deviceId));
Set<Group> extraneousStoredEntries =
Sets.newHashSet(getExtraneousGroups(deviceId));
log.trace("pushGroupMetrics: Displaying all ({}) "
+ "southboundGroupEntries for device {}",
southboundGroupEntries.size(),
deviceId);
for (Iterator<Group> it = southboundGroupEntries.iterator(); it.hasNext();) {
Group group = it.next();
log.trace("Group {} in device {}", group, deviceId);
}
log.trace("Displaying all ({}) stored group entries for device {}",
storedGroupEntries.size(),
deviceId);
for (Iterator<Group> it1 = storedGroupEntries.iterator();
it1.hasNext();) {
Group group = it1.next();
log.trace("Stored Group {} for device {}", group, deviceId);
}
for (Iterator<Group> it2 = southboundGroupEntries.iterator(); it2.hasNext();) {
Group group = it2.next();
if (storedGroupEntries.remove(group)) {
// we both have the group, let's update some info then.
log.trace("Group AUDIT: group {} exists "
+ "in both planes for device {}",
group.id(), deviceId);
groupAdded(group);
it2.remove();
}
}
for (Group group : southboundGroupEntries) {
if (getGroup(group.deviceId(), group.id()) != null) {
// There is a group existing with the same id
// It is possible that group update is
// in progress while we got a stale info from switch
if (!storedGroupEntries.remove(getGroup(
group.deviceId(), group.id()))) {
log.warn("Group AUDIT: Inconsistent state:"
+ "Group exists in ID based table while "
+ "not present in key based table");
}
} else {
// there are groups in the switch that aren't in the store
log.trace("Group AUDIT: extraneous group {} exists "
+ "in data plane for device {}",
group.id(), deviceId);
extraneousStoredEntries.remove(group);
extraneousGroup(group);
}
}
for (Group group : storedGroupEntries) {
// there are groups in the store that aren't in the switch
log.trace("Group AUDIT: group {} missing "
+ "in data plane for device {}",
group.id(), deviceId);
groupMissing(group);
}
for (Group group : extraneousStoredEntries) {
// there are groups in the extraneous store that
// aren't in the switch
log.trace("Group AUDIT: clearing extransoeus group {} "
+ "from store for device {}",
group.id(), deviceId);
removeExtraneousGroupEntry(group);
}
if (!deviceInitialAuditStatus) {
log.debug("Group AUDIT: Setting device {} initial "
+ "AUDIT completed", deviceId);
deviceInitialAuditCompleted(deviceId, true);
}
}
private void groupMissing(Group group) {
switch (group.state()) {
case PENDING_DELETE:
log.debug("Group {} delete confirmation from device {}",
group, group.deviceId());
removeGroupEntry(group);
break;
case ADDED:
case PENDING_ADD:
case PENDING_UPDATE:
log.debug("Group {} is in store but not on device {}",
group, group.deviceId());
StoredGroupEntry existing = (groupEntriesById.get(
group.deviceId()) != null) ?
groupEntriesById.get(group.deviceId()).get(group.id()) :
null;
log.trace("groupMissing: group "
+ "entry {} in device {} moving "
+ "from {} to PENDING_ADD",
existing.id(),
existing.deviceId(),
existing.state());
existing.setState(Group.GroupState.PENDING_ADD);
notifyDelegate(new GroupEvent(GroupEvent.Type.GROUP_ADD_REQUESTED,
group));
break;
default:
log.debug("Group {} has not been installed.", group);
break;
}
}
private void extraneousGroup(Group group) {
log.debug("Group {} is on device {} but not in store.",
group, group.deviceId());
addOrUpdateExtraneousGroupEntry(group);
}
private void groupAdded(Group group) {
log.trace("Group {} Added or Updated in device {}",
group, group.deviceId());
addOrUpdateGroupEntry(group);
}
}
......
......@@ -195,11 +195,17 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
@Override
public void onSuccess(FlowRuleOperations ops) {
pass(fwd);
log.debug("Provisioned tables in {} with "
+ "forwarding rules for segment "
+ "router", deviceId);
}
@Override
public void onError(FlowRuleOperations ops) {
fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
log.warn("Failed to provision tables in {} with "
+ "forwarding rules for segment router",
deviceId);
}
}));
......@@ -228,6 +234,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
}
private void removeGroup(NextObjective nextObjective) {
log.debug("removeGroup in {}: for next objective id {}",
deviceId, nextObjective.id());
final GroupKey key = new DefaultGroupKey(
appKryo.serialize(nextObjective.id()));
groupService.removeGroup(deviceId, key, appId);
......@@ -293,6 +301,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
}
private void addBucketToGroup(NextObjective nextObjective) {
log.debug("addBucketToGroup in {}: for next objective id {}",
deviceId, nextObjective.id());
Collection<TrafficTreatment> treatments = nextObjective.next();
TrafficTreatment treatment = treatments.iterator().next();
final GroupKey key = new DefaultGroupKey(
......@@ -317,6 +327,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
}
private void removeBucketFromGroup(NextObjective nextObjective) {
log.debug("removeBucketFromGroup in {}: for next objective id {}",
deviceId, nextObjective.id());
NextGroup nextGroup = flowObjectiveStore.getNextGroup(nextObjective.id());
if (nextGroup != null) {
Collection<TrafficTreatment> treatments = nextObjective.next();
......@@ -369,7 +381,7 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
if ((ethType == null) ||
((((short) ethType.ethType()) != Ethernet.TYPE_IPV4) &&
(((short) ethType.ethType()) != Ethernet.MPLS_UNICAST))) {
log.debug("processSpecific: Unsupported "
log.warn("processSpecific: Unsupported "
+ "forwarding objective criteraia");
fail(fwd, ObjectiveError.UNSUPPORTED);
return Collections.emptySet();
......@@ -424,6 +436,10 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
}
treatmentBuilder.group(group.id());
log.debug("Adding OUTGROUP action");
} else {
log.warn("processSpecific: No associated next objective object");
fail(fwd, ObjectiveError.GROUPMISSING);
return Collections.emptySet();
}
}
......@@ -485,15 +501,39 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
return rules;
}
protected List<FlowRule> processVlanIdFilter(Criterion c,
FilteringObjective filt,
ApplicationId applicationId) {
List<FlowRule> rules = new ArrayList<FlowRule>();
VlanIdCriterion v = (VlanIdCriterion) c;
log.debug("adding rule for VLAN: {}", v.vlanId());
TrafficSelector.Builder selector = DefaultTrafficSelector
.builder();
TrafficTreatment.Builder treatment = DefaultTrafficTreatment
.builder();
PortCriterion p = (PortCriterion) filt.key();
if (v.vlanId() != VlanId.NONE) {
selector.matchVlanId(v.vlanId());
selector.matchInPort(p.port());
treatment.deferred().popVlan();
}
treatment.transition(tmacTableId);
FlowRule rule = DefaultFlowRule.builder().forDevice(deviceId)
.withSelector(selector.build())
.withTreatment(treatment.build())
.withPriority(filt.priority()).fromApp(applicationId)
.makePermanent().forTable(vlanTableId).build();
rules.add(rule);
return rules;
}
private void processFilter(FilteringObjective filt, boolean install,
ApplicationId applicationId) {
// This driver only processes filtering criteria defined with switch
// ports as the key
PortCriterion p;
if (!filt.key().equals(Criteria.dummy())
&& filt.key().type() == Criterion.Type.IN_PORT) {
p = (PortCriterion) filt.key();
} else {
if (filt.key().equals(Criteria.dummy())
|| filt.key().type() != Criterion.Type.IN_PORT) {
log.warn("No key defined in filtering objective from app: {}. Not"
+ "processing filtering objective", applicationId);
fail(filt, ObjectiveError.UNKNOWN);
......@@ -509,24 +549,11 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
ops = install ? ops.add(rule) : ops.remove(rule);
}
} else if (c.type() == Criterion.Type.VLAN_VID) {
VlanIdCriterion v = (VlanIdCriterion) c;
log.debug("adding rule for VLAN: {}", v.vlanId());
TrafficSelector.Builder selector = DefaultTrafficSelector
.builder();
TrafficTreatment.Builder treatment = DefaultTrafficTreatment
.builder();
if (v.vlanId() != VlanId.NONE) {
selector.matchVlanId(v.vlanId());
selector.matchInPort(p.port());
treatment.deferred().popVlan();
for (FlowRule rule : processVlanIdFilter(c,
filt,
applicationId)) {
ops = install ? ops.add(rule) : ops.remove(rule);
}
treatment.transition(tmacTableId);
FlowRule rule = DefaultFlowRule.builder().forDevice(deviceId)
.withSelector(selector.build())
.withTreatment(treatment.build())
.withPriority(filt.priority()).fromApp(applicationId)
.makePermanent().forTable(vlanTableId).build();
ops = install ? ops.add(rule) : ops.remove(rule);
} else if (c.type() == Criterion.Type.IPV4_DST) {
IPCriterion ip = (IPCriterion) c;
log.debug("adding rule for IP: {}", ip.ip());
......@@ -554,13 +581,15 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
@Override
public void onSuccess(FlowRuleOperations ops) {
pass(filt);
log.info("Provisioned tables for segment router");
log.debug("Provisioned tables in {} with fitering "
+ "rules for segment router", deviceId);
}
@Override
public void onError(FlowRuleOperations ops) {
fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
log.info("Failed to provision tables for segment router");
log.warn("Failed to provision tables in {} with "
+ "fitering rules for segment router", deviceId);
}
}));
}
......@@ -618,6 +647,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
@Override
public void event(GroupEvent event) {
if (event.type() == GroupEvent.Type.GROUP_ADDED) {
log.debug("InnerGroupListener: Group ADDED "
+ "event received in device {}", deviceId);
GroupKey key = event.subject().appCookie();
NextObjective obj = pendingGroups.getIfPresent(key);
......@@ -628,6 +659,9 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
pass(obj);
pendingGroups.invalidate(key);
}
} else if (event.type() == GroupEvent.Type.GROUP_ADD_FAILED) {
log.warn("InnerGroupListener: Group ADD "
+ "failed event received in device {}", deviceId);
}
}
}
......
......@@ -15,7 +15,6 @@
*/
package org.onosproject.driver.pipeline;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
......@@ -145,6 +144,10 @@ public class SpringOpenTTPDell extends SpringOpenTTP {
}
treatmentBuilder.group(group.id());
log.debug("Adding OUTGROUP action");
} else {
log.warn("processSpecific: No associated next objective object");
fail(fwd, ObjectiveError.GROUPMISSING);
return Collections.emptySet();
}
}
......@@ -175,43 +178,23 @@ public class SpringOpenTTPDell extends SpringOpenTTP {
protected List<FlowRule> processEthDstFilter(Criterion c,
FilteringObjective filt,
ApplicationId applicationId) {
List<FlowRule> rules = new ArrayList<FlowRule>();
EthCriterion e = (EthCriterion) c;
TrafficSelector.Builder selectorIp = DefaultTrafficSelector
.builder();
TrafficTreatment.Builder treatmentIp = DefaultTrafficTreatment
.builder();
// Store device termination Mac to be used in IP flow entries
EthCriterion e = (EthCriterion) c;
deviceTMac = e.mac();
selectorIp.matchEthDst(e.mac());
selectorIp.matchEthType(Ethernet.TYPE_IPV4);
treatmentIp.transition(ipv4UnicastTableId);
FlowRule ruleIp = DefaultFlowRule.builder().forDevice(deviceId)
.withSelector(selectorIp.build())
.withTreatment(treatmentIp.build())
.withPriority(filt.priority()).fromApp(applicationId)
.makePermanent().forTable(tmacTableId).build();
log.debug("adding IP ETH rule for MAC: {}", e.mac());
rules.add(ruleIp);
TrafficSelector.Builder selectorMpls = DefaultTrafficSelector
.builder();
TrafficTreatment.Builder treatmentMpls = DefaultTrafficTreatment
.builder();
selectorMpls.matchEthDst(e.mac());
selectorMpls.matchEthType(Ethernet.MPLS_UNICAST);
treatmentMpls.transition(mplsTableId);
FlowRule ruleMpls = DefaultFlowRule.builder()
.forDevice(deviceId).withSelector(selectorMpls.build())
.withTreatment(treatmentMpls.build())
.withPriority(filt.priority()).fromApp(applicationId)
.makePermanent().forTable(tmacTableId).build();
log.debug("adding MPLS ETH rule for MAC: {}", e.mac());
rules.add(ruleMpls);
return rules;
log.debug("For now not adding any TMAC rules "
+ "into Dell switches as it is ignoring");
return Collections.emptyList();
}
@Override
protected List<FlowRule> processVlanIdFilter(Criterion c,
FilteringObjective filt,
ApplicationId applicationId) {
log.debug("For now not adding any VLAN rules "
+ "into Dell switches as it is ignoring");
return Collections.emptyList();
}
}
\ No newline at end of file
......
......@@ -37,13 +37,6 @@
<behaviour api="org.onosproject.net.behaviour.Pipeliner"
impl="org.onosproject.driver.pipeline.SpringOpenTTPDell"/>
</driver>
<driver name="cpqd" manufacturer="Stanford University, Ericsson Research and CPqD Research"
hwVersion="OpenFlow 1.3 Reference Userspace Switch" swVersion=".*">
<behaviour api="org.onosproject.net.behaviour.Pipeliner"
impl="org.onosproject.driver.pipeline.SpringOpenTTP"/>
<behaviour api="org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver"
impl="org.onosproject.driver.handshaker.OFSwitchImplSpringOpenTTP"/>
</driver>
<driver name="linc-oe" extends="default"
manufacturer="FlowForwarding.org" hwVersion="Unknown" swVersion="LINC-OE OpenFlow Software Switch 1.1">
<behaviour api="org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver"
......
......@@ -295,7 +295,7 @@ class OFChannelHandler extends IdleStateAwareChannelHandler {
return;
}
if (m.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
log.warn("Stats reply indicates more stats from sw {} for "
log.debug("Stats reply indicates more stats from sw {} for "
+ "port description",
h.getSwitchInfoString());
h.portDescReplies.add((OFPortDescStatsReply)m);
......