Srikanth Vavilapalli
Committed by Gerrit Code Review

ONOS-1438: Segment Routing rule population optimization fixes

Change-Id: I2cad2cd485282b904e035b209530005b93c90ffd
......@@ -184,21 +184,40 @@ public class DefaultRoutingHandler {
private boolean repopulateRoutingRulesForRoutes(Set<ArrayList<DeviceId>> routes) {
rulePopulator.resetCounter();
HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>> routesBydevice =
new HashMap<>();
for (ArrayList<DeviceId> link: routes) {
// When only the source device is defined, reinstall routes to all other devices
if (link.size() == 1) {
log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", link.get(0));
ECMPShortestPathGraph ecmpSpg = new ECMPShortestPathGraph(link.get(0), srManager);
if (populateEcmpRoutingRules(link.get(0), ecmpSpg)) {
log.debug("Populating flow rules from {} to all is successful",
link.get(0));
currentEcmpSpgMap.put(link.get(0), ecmpSpg);
} else {
log.warn("Failed to populate the flow rules from {} to all", link.get(0));
return false;
}
} else {
ArrayList<ArrayList<DeviceId>> deviceRoutes =
routesBydevice.get(link.get(1));
if (deviceRoutes == null) {
deviceRoutes = new ArrayList<>();
routesBydevice.put(link.get(1), deviceRoutes);
}
deviceRoutes.add(link);
}
}
for (DeviceId impactedDevice : routesBydevice.keySet()) {
ArrayList<ArrayList<DeviceId>> deviceRoutes =
routesBydevice.get(impactedDevice);
for (ArrayList<DeviceId> link: deviceRoutes) {
log.debug("repopulate RoutingRules For Routes {} -> {}",
link.get(0), link.get(1));
DeviceId src = link.get(0);
DeviceId dst = link.get(1);
log.trace("repopulateRoutingRulesForRoutes: running ECMP graph for device {}", dst);
ECMPShortestPathGraph ecmpSpg = updatedEcmpSpgMap.get(dst);
HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
ecmpSpg.getAllLearnedSwitchesAndVia();
......@@ -220,10 +239,18 @@ public class DefaultRoutingHandler {
if (!populateEcmpRoutingRulePartial(targetSw, dst, nextHops)) {
return false;
}
log.debug("Populating flow rules from {} to {} is successful",
targetSw, dst);
}
}
currentEcmpSpgMap.put(dst, ecmpSpg);
//currentEcmpSpgMap.put(dst, ecmpSpg);
}
//Only if all the flows for all impacted routes to a
//specific target are pushed successfully, update the
//ECMP graph for that target. (Or else the next event
//would not see any changes in the ECMP graphs)
currentEcmpSpgMap.put(impactedDevice,
updatedEcmpSpgMap.get(impactedDevice));
}
return true;
}
......@@ -233,13 +260,15 @@ public class DefaultRoutingHandler {
Set<ArrayList<DeviceId>> routes = new HashSet<>();
for (Device sw : srManager.deviceService.getDevices()) {
log.debug("Computing the impacted routes for device {} due to link fail",
sw.id());
if (srManager.mastershipService.
getLocalRole(sw.id()) != MastershipRole.MASTER) {
continue;
}
ECMPShortestPathGraph ecmpSpg = currentEcmpSpgMap.get(sw.id());
if (ecmpSpg == null) {
log.error("No existing ECMP path for switch {}", sw.id());
log.error("No existing ECMP graph for switch {}", sw.id());
continue;
}
HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
......@@ -252,8 +281,12 @@ public class DefaultRoutingHandler {
Set<ArrayList<DeviceId>> subLinks =
computeLinks(targetSw, destSw, swViaMap);
for (ArrayList<DeviceId> alink: subLinks) {
if (alink.get(0).equals(linkFail.src().deviceId()) &&
alink.get(1).equals(linkFail.dst().deviceId())) {
if ((alink.get(0).equals(linkFail.src().deviceId()) &&
alink.get(1).equals(linkFail.dst().deviceId()))
||
(alink.get(0).equals(linkFail.dst().deviceId()) &&
alink.get(1).equals(linkFail.src().deviceId()))) {
log.debug("Impacted route:{}->{}", targetSw, destSw);
ArrayList<DeviceId> aRoute = new ArrayList<>();
aRoute.add(targetSw);
aRoute.add(destSw);
......@@ -274,9 +307,12 @@ public class DefaultRoutingHandler {
Set<ArrayList<DeviceId>> routes = new HashSet<>();
for (Device sw : srManager.deviceService.getDevices()) {
log.debug("Computing the impacted routes for device {}",
sw.id());
if (srManager.mastershipService.
getLocalRole(sw.id()) != MastershipRole.MASTER) {
log.warn("No mastership for {} and skip route optimization");
log.debug("No mastership for {} and skip route optimization",
sw.id());
continue;
}
......@@ -295,7 +331,7 @@ public class DefaultRoutingHandler {
continue;
}
ECMPShortestPathGraph newEcmpSpg = updatedEcmpSpgMap.get(sw.id());
currentEcmpSpgMap.put(sw.id(), newEcmpSpg);
//currentEcmpSpgMap.put(sw.id(), newEcmpSpg);
HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchVia =
ecmpSpg.getAllLearnedSwitchesAndVia();
HashMap<Integer, HashMap<DeviceId, ArrayList<ArrayList<DeviceId>>>> switchViaUpdated =
......@@ -307,7 +343,8 @@ public class DefaultRoutingHandler {
for (DeviceId srcSw : swViaMapUpdated.keySet()) {
ArrayList<ArrayList<DeviceId>> viaUpdated = swViaMapUpdated.get(srcSw);
ArrayList<ArrayList<DeviceId>> via = getVia(switchVia, srcSw);
if (via.isEmpty() || !viaUpdated.equals(via)) {
if ((via == null) || !viaUpdated.equals(via)) {
log.debug("Impacted route:{}->{}", srcSw, sw.id());
ArrayList<DeviceId> route = new ArrayList<>();
route.add(srcSw);
route.add(sw.id());
......@@ -318,7 +355,7 @@ public class DefaultRoutingHandler {
}
for (ArrayList<DeviceId> link: routes) {
log.trace("Link changes - ");
log.trace("Route changes - ");
if (link.size() == 1) {
log.trace(" : {} - all", link.get(0));
} else {
......@@ -341,7 +378,7 @@ public class DefaultRoutingHandler {
}
}
return new ArrayList<>();
return null;
}
private Set<ArrayList<DeviceId>> computeLinks(DeviceId src,
......
......@@ -133,7 +133,9 @@ public class SegmentRoutingManager implements SegmentRoutingService {
private NetworkConfigManager networkConfigService = new NetworkConfigManager();;
private static int numOfEvents = 0;
private Object threadSchedulerLock = new Object();
private static int numOfEventsQueued = 0;
private static int numOfEventsExecuted = 0;
private static int numOfHandlerExecution = 0;
private static int numOfHandlerScheduled = 0;
......@@ -325,6 +327,7 @@ public class SegmentRoutingManager implements SegmentRoutingService {
public void event(LinkEvent event) {
if (event.type() == LinkEvent.Type.LINK_ADDED
|| event.type() == LinkEvent.Type.LINK_REMOVED) {
log.debug("Event {} received from Link Service", event.type());
scheduleEventHandlerIfNotScheduled(event);
}
}
......@@ -346,6 +349,7 @@ public class SegmentRoutingManager implements SegmentRoutingService {
case PORT_REMOVED:
case DEVICE_UPDATED:
case DEVICE_AVAILABILITY_CHANGED:
log.debug("Event {} received from Device Service", event.type());
scheduleEventHandlerIfNotScheduled(event);
break;
default:
......@@ -355,19 +359,20 @@ public class SegmentRoutingManager implements SegmentRoutingService {
private void scheduleEventHandlerIfNotScheduled(Event event) {
synchronized (eventQueue) {
synchronized (threadSchedulerLock) {
eventQueue.add(event);
numOfEvents++;
if (eventHandlerFuture == null || eventHandlerFuture.isDone()) {
numOfEventsQueued++;
if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
//No pending scheduled event handling threads. So start a new one.
eventHandlerFuture = executorService
.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
numOfHandlerScheduled++;
}
log.trace("numOfEventsQueued {}, numOfEventHanlderScheduled {}",
numOfEventsQueued,
numOfHandlerScheduled);
}
log.trace("numOfEvents {}, numOfEventHanlderScheduled {}", numOfEvents,
numOfHandlerScheduled);
}
private class InternalEventHandler implements Runnable {
......@@ -375,32 +380,38 @@ public class SegmentRoutingManager implements SegmentRoutingService {
@Override
public void run() {
try {
synchronized (eventQueue) {
numOfHandlerExecution++;
while (!eventQueue.isEmpty()) {
Event event = eventQueue.poll();
if (event.type() == LinkEvent.Type.LINK_ADDED) {
processLinkAdded((Link) event.subject());
} else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
processLinkRemoved((Link) event.subject());
//} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
// processGroupAdded((Group) event.subject());
} else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
if (deviceService.isAvailable(((Device) event.subject()).id())) {
processDeviceAdded((Device) event.subject());
}
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
processPortRemoved((Device) event.subject(),
((DeviceEvent) event).port());
while (true) {
Event event = null;
synchronized (threadSchedulerLock) {
if (!eventQueue.isEmpty()) {
event = eventQueue.poll();
numOfEventsExecuted++;
} else {
log.warn("Unhandled event type: {}", event.type());
numOfHandlerExecution++;
log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
numOfHandlerExecution, numOfEventsExecuted);
break;
}
}
if (event.type() == LinkEvent.Type.LINK_ADDED) {
processLinkAdded((Link) event.subject());
} else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
processLinkRemoved((Link) event.subject());
//} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
// processGroupAdded((Group) event.subject());
} else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
if (deviceService.isAvailable(((Device) event.subject()).id())) {
processDeviceAdded((Device) event.subject());
}
} else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
processPortRemoved((Device) event.subject(),
((DeviceEvent) event).port());
} else {
log.warn("Unhandled event type: {}", event.type());
}
}
log.debug("numOfHandlerExecution {} numOfEventHanlderScheduled {} numOfEvents {}",
numOfHandlerExecution, numOfHandlerScheduled, numOfEvents);
} catch (Exception e) {
log.error("SegmentRouting event handler "
+ "thread thrown an exception: {}", e);
......@@ -433,9 +444,10 @@ public class SegmentRoutingManager implements SegmentRoutingService {
}
}
//defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
log.trace("processLinkAdded: re-starting route population process");
defaultRoutingHandler.startPopulationProcess();
log.trace("Starting optimized route population process");
defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
//log.trace("processLinkAdded: re-starting route population process");
//defaultRoutingHandler.startPopulationProcess();
}
private void processLinkRemoved(Link link) {
......@@ -444,9 +456,10 @@ public class SegmentRoutingManager implements SegmentRoutingService {
if (groupHandler != null) {
groupHandler.portDown(link.src().port());
}
//defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
log.trace("processLinkRemoved: re-starting route population process");
defaultRoutingHandler.startPopulationProcess();
log.trace("Starting optimized route population process");
defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
//log.trace("processLinkRemoved: re-starting route population process");
//defaultRoutingHandler.startPopulationProcess();
}
private void processDeviceAdded(Device device) {
......
......@@ -35,6 +35,10 @@ public interface Group extends GroupDescription {
*/
PENDING_ADD,
/**
* Group is missing in data plane and retrying GROUP ADD request.
*/
PENDING_ADD_RETRY,
/**
* Group is created in the data plane.
*/
ADDED,
......
......@@ -715,11 +715,12 @@ public class DistributedGroupStore
existing.setLife(group.life());
existing.setPackets(group.packets());
existing.setBytes(group.bytes());
if (existing.state() == GroupState.PENDING_ADD) {
if ((existing.state() == GroupState.PENDING_ADD) ||
(existing.state() == GroupState.PENDING_ADD_RETRY)) {
log.debug("addOrUpdateGroupEntry: group entry {} in device {} moving from {} to ADDED",
existing.id(),
existing.deviceId(),
GroupState.PENDING_ADD);
existing.state());
existing.setState(GroupState.ADDED);
existing.setIsGroupStateAddedFirstTime(true);
event = new GroupEvent(Type.GROUP_ADDED, existing);
......@@ -839,15 +840,22 @@ public class DistributedGroupStore
existing.deviceId());
switch (operation.opType()) {
case ADD:
notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
log.warn("groupOperationFailed: cleaningup "
+ "group {} from store in device {}....",
existing.id(),
existing.deviceId());
//Removal from groupid based map will happen in the
//map update listener
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
if (existing.state() == GroupState.PENDING_ADD) {
//TODO: Need to add support for passing the group
//operation failure reason from group provider.
//If the error type is anything other than GROUP_EXISTS,
//then the GROUP_ADD_FAILED event should be raised even
//in PENDING_ADD_RETRY state also.
notifyDelegate(new GroupEvent(Type.GROUP_ADD_FAILED, existing));
log.warn("groupOperationFailed: cleaningup "
+ "group {} from store in device {}....",
existing.id(),
existing.deviceId());
//Removal from groupid based map will happen in the
//map update listener
getGroupStoreKeyMap().remove(new GroupStoreKeyMapKey(existing.deviceId(),
existing.appCookie()));
}
break;
case MODIFY:
notifyDelegate(new GroupEvent(Type.GROUP_UPDATE_FAILED, existing));
......@@ -1196,16 +1204,17 @@ public class DistributedGroupStore
break;
case ADDED:
case PENDING_ADD:
case PENDING_ADD_RETRY:
case PENDING_UPDATE:
log.debug("Group {} is in store but not on device {}",
group, group.deviceId());
StoredGroupEntry existing =
getStoredGroupEntry(group.deviceId(), group.id());
log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD",
log.debug("groupMissing: group entry {} in device {} moving from {} to PENDING_ADD_RETRY",
existing.id(),
existing.deviceId(),
existing.state());
existing.setState(Group.GroupState.PENDING_ADD);
existing.setState(Group.GroupState.PENDING_ADD_RETRY);
//Re-PUT map entries to trigger map update events
getGroupStoreKeyMap().
put(new GroupStoreKeyMapKey(existing.deviceId(),
......
......@@ -215,6 +215,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
@Override
public void next(NextObjective nextObjective) {
log.debug("Processing NextObjective id{} op{}", nextObjective.id(),
nextObjective.op());
if (nextObjective.op() == Objective.Operation.REMOVE) {
if (nextObjective.next().isEmpty()) {
removeGroup(nextObjective);
......@@ -243,6 +245,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
}
private void addGroup(NextObjective nextObjective) {
log.debug("addGroup with type{} for nextObjective id {}",
nextObjective.type(), nextObjective.id());
switch (nextObjective.type()) {
case SIMPLE:
log.debug("processing SIMPLE next objective");
......@@ -262,6 +266,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
key,
null,
nextObjective.appId());
log.debug("Creating SIMPLE group for next objective id {}",
nextObjective.id());
groupService.addGroup(groupDescription);
pendingGroups.put(key, nextObjective);
}
......@@ -285,6 +291,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
key,
null,
nextObjective.appId());
log.debug("Creating HASHED group for next objective id {}",
nextObjective.id());
groupService.addGroup(groupDescription);
pendingGroups.put(key, nextObjective);
}
......@@ -324,6 +332,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
return;
}
GroupBuckets bucketsToAdd = new GroupBuckets(Collections.singletonList(bucket));
log.debug("Adding buckets to group id {} of next objective id {} in device {}",
group.id(), nextObjective.id(), deviceId);
groupService.addBucketsToGroup(deviceId, key, bucketsToAdd, key, appId);
}
......@@ -352,6 +362,8 @@ public class SpringOpenTTP extends AbstractHandlerBehaviour
return;
}
GroupBuckets removeBuckets = new GroupBuckets(Collections.singletonList(bucket));
log.debug("Removing buckets from group id {} of next objective id {} in device {}",
group.id(), nextObjective.id(), deviceId);
groupService.removeBucketsFromGroup(deviceId, key, removeBuckets, key, appId);
}
}
......
......@@ -40,9 +40,10 @@
"load": {
"alpha": 0.9,
"sprites": [
{ "id": "rack", "pos":[300,600], "class":"blue1" },
{ "id": "rack", "pos":[500,600], "class":"blue1" },
{ "id": "rack", "pos":[700,600], "class":"blue1" }
{ "id": "rack", "pos":[200,600], "class":"blue1" },
{ "id": "rack", "pos":[400,600], "class":"blue1" },
{ "id": "rack", "pos":[600,600], "class":"blue1" },
{ "id": "rack", "pos":[800,600], "class":"blue1" }
],
"labels": [
{ "pos":[550,80], "text":"Segment Routing Demo", "class":"blue1", "size":1.4 }
......