Phaneendra Manda
Committed by Gerrit Code Review

[ONOS-3831,ONOS-3836] Load balance algorithm for sfc

Change-Id: I48a428587420ce6d782c128b835b5bb90e0cacfe
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.sfc.manager;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Unique NSH SPI Id generator for NSH header.
*/
public final class NshSpiIdGenerators {
private static final AtomicInteger NSH_SPI_ID_GEN = new AtomicInteger();
private static final int MAX_NSH_SPI_ID = 0x7FFFFFFF;
private static int nshSpiId;
/**
* Default constructor.
*/
private NshSpiIdGenerators() {
}
/**
* Get the next NSH SPI id.
*
* @return NSH SPI id
*/
public static int create() {
do {
if (nshSpiId >= MAX_NSH_SPI_ID) {
if (NSH_SPI_ID_GEN.get() >= MAX_NSH_SPI_ID) {
NSH_SPI_ID_GEN.set(0);
}
}
nshSpiId = NSH_SPI_ID_GEN.incrementAndGet();
} while (nshSpiId > MAX_NSH_SPI_ID);
return nshSpiId;
}
}
......@@ -18,9 +18,10 @@ package org.onosproject.sfc.manager.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
......@@ -38,8 +39,14 @@ import org.onlab.util.ItemNotFoundException;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.core.IdGenerator;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.NshServicePathId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.DefaultTrafficTreatment;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.packet.DefaultOutboundPacket;
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
......@@ -47,13 +54,19 @@ import org.onosproject.sfc.forwarder.ServiceFunctionForwarderService;
import org.onosproject.sfc.forwarder.impl.ServiceFunctionForwarderImpl;
import org.onosproject.sfc.installer.FlowClassifierInstallerService;
import org.onosproject.sfc.installer.impl.FlowClassifierInstallerImpl;
import org.onosproject.sfc.manager.NshSpiIdGenerators;
import org.onosproject.sfc.manager.SfcService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.onosproject.vtnrsc.DefaultFiveTuple;
import org.onosproject.vtnrsc.FiveTuple;
import org.onosproject.vtnrsc.FixedIp;
import org.onosproject.vtnrsc.FlowClassifier;
import org.onosproject.vtnrsc.FlowClassifierId;
import org.onosproject.vtnrsc.LoadBalanceId;
import org.onosproject.vtnrsc.PortChain;
import org.onosproject.vtnrsc.PortChainId;
import org.onosproject.vtnrsc.PortPair;
......@@ -68,10 +81,13 @@ import org.onosproject.vtnrsc.event.VtnRscEventFeedback;
import org.onosproject.vtnrsc.event.VtnRscListener;
import org.onosproject.vtnrsc.flowclassifier.FlowClassifierService;
import org.onosproject.vtnrsc.portchain.PortChainService;
import org.onosproject.vtnrsc.portpairgroup.PortPairGroupService;
import org.onosproject.vtnrsc.service.VtnRscService;
import org.onosproject.vtnrsc.virtualport.VirtualPortService;
import org.slf4j.Logger;
import com.google.common.collect.Lists;
/**
* Provides implementation of SFC Service.
*/
......@@ -80,9 +96,13 @@ import org.slf4j.Logger;
public class SfcManager implements SfcService {
private final Logger log = getLogger(getClass());
private String nshSpiIdTopic = "nsh-spi-id";
private static final String APP_ID = "org.onosproject.app.vtn";
private static final int SFC_PRIORITY = 1000;
private static final int NULL_PORT = 0;
private static final int MAX_NSH_SPI_ID = 0x7FFFF;
private static final int MAX_LOAD_BALANCE_ID = 0x20;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VtnRscService vtnRscService;
......@@ -97,26 +117,34 @@ public class SfcManager implements SfcService {
protected PortChainService portChainService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PortPairGroupService portPairGroupService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowClassifierService flowClassifierService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected VirtualPortService virtualPortService;
private SfcPacketProcessor processor = new SfcPacketProcessor();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
protected SfcPacketProcessor processor = new SfcPacketProcessor();
protected ApplicationId appId;
private ServiceFunctionForwarderService serviceFunctionForwarderService;
private FlowClassifierInstallerService flowClassifierInstallerService;
protected ServiceFunctionForwarderService serviceFunctionForwarder;
protected FlowClassifierInstallerService flowClassifierInstaller;
protected IdGenerator nshSpiIdGenerator;
protected EventuallyConsistentMap<PortChainId, Integer> nshSpiPortChainMap;
protected DistributedSet<Integer> nshSpiIdFreeList;
private final VtnRscListener vtnRscListener = new InnerVtnRscListener();
private ConcurrentMap<PortChainId, NshServicePathId> nshSpiPortChainMap = new ConcurrentHashMap<>();
@Activate
public void activate() {
appId = coreService.registerApplication(APP_ID);
serviceFunctionForwarderService = new ServiceFunctionForwarderImpl(appId);
flowClassifierInstallerService = new FlowClassifierInstallerImpl(appId);
serviceFunctionForwarder = new ServiceFunctionForwarderImpl(appId);
flowClassifierInstaller = new FlowClassifierInstallerImpl(appId);
nshSpiIdGenerator = coreService.getIdGenerator(nshSpiIdTopic);
vtnRscService.addListener(vtnRscListener);
......@@ -127,6 +155,18 @@ public class SfcManager implements SfcService {
.register(FlowClassifierId.class)
.register(PortChainId.class);
nshSpiPortChainMap = storageService.<PortChainId, Integer>eventuallyConsistentMapBuilder()
.withName("nshSpiPortChainMap")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
nshSpiIdFreeList = storageService.<Integer>setBuilder()
.withName("nshSpiIdDeletedList")
.withSerializer(Serializer.using(KryoNamespaces.API))
.build()
.asDistributedSet();
packetService.addProcessor(processor, PacketProcessor.director(SFC_PRIORITY));
log.info("Started");
}
......@@ -230,15 +270,20 @@ public class SfcManager implements SfcService {
NshServicePathId nshSpi;
log.info("onPortChainCreated");
if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
nshSpi = nshSpiPortChainMap.get(portChain.portChainId());
nshSpi = NshServicePathId.of(nshSpiPortChainMap.get(portChain.portChainId()));
} else {
nshSpi = NshServicePathId.of(NshSpiIdGenerators.create());
nshSpiPortChainMap.put(portChain.portChainId(), nshSpi);
int id = getNextNshSpi();
if (id > MAX_NSH_SPI_ID) {
log.error("Reached max limit of service path index."
+ "Failed to install SFC for port chain {}", portChain.portChainId().toString());
return;
}
nshSpi = NshServicePathId.of(id);
nshSpiPortChainMap.put(portChain.portChainId(), new Integer(id));
}
// install in OVS.
flowClassifierInstallerService.installFlowClassifier(portChain, nshSpi);
serviceFunctionForwarderService.installForwardingRule(portChain, nshSpi);
// Install classifier rule to send the packet to controller
flowClassifierInstaller.installFlowClassifier(portChain, nshSpi);
}
@Override
......@@ -248,13 +293,47 @@ public class SfcManager implements SfcService {
throw new ItemNotFoundException("Unable to find NSH SPI");
}
NshServicePathId nshSpi = nshSpiPortChainMap.get(portChain.portChainId());
// uninstall from OVS.
flowClassifierInstallerService.unInstallFlowClassifier(portChain, nshSpi);
serviceFunctionForwarderService.unInstallForwardingRule(portChain, nshSpi);
int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
// Uninstall classifier rules
flowClassifierInstaller.unInstallFlowClassifier(portChain, NshServicePathId.of(nshSpiId));
// remove from nshSpiPortChainMap and add to nshSpiIdFreeList
nshSpiPortChainMap.remove(portChain.portChainId());
nshSpiIdFreeList.add(nshSpiId);
// Uninstall load balanced classifier and forwarding rules.
NshServicePathId nshSpi;
LoadBalanceId id;
List<LoadBalanceId> processedIdList = Lists.newArrayList();
Set<FiveTuple> fiveTupleSet = portChain.getLoadBalanceIdMapKeys();
for (FiveTuple fiveTuple : fiveTupleSet) {
id = portChain.getLoadBalanceId(fiveTuple);
if (processedIdList.contains(id)) {
// multiple five tuple can have single path.
continue;
} else {
processedIdList.add(id);
}
nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
flowClassifierInstaller.unInstallLoadBalancedFlowClassifier(portChain, fiveTuple, nshSpi);
serviceFunctionForwarder.unInstallLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
nshSpi);
}
}
// remove SPI. No longer it will be used.
nshSpiPortChainMap.remove(nshSpi);
/**
* Get next nsh service path identifier.
*
* @return value of service path identifier
*/
int getNextNshSpi() {
// If there is any free id use it. Otherwise generate new id.
if (nshSpiIdFreeList.isEmpty()) {
return (int) nshSpiIdGenerator.getNewId();
}
Iterator<Integer> it = nshSpiIdFreeList.iterator();
Integer value = it.next();
nshSpiIdFreeList.remove(value);
return value;
}
private class SfcPacketProcessor implements PacketProcessor {
......@@ -296,6 +375,10 @@ public class SfcManager implements SfcService {
// Identify the port chain to which the packet belongs
for (final PortChain portChain : portChains) {
if (!portChain.tenantId().equals(fiveTuple.tenantId())) {
continue;
}
Iterable<FlowClassifierId> flowClassifiers = portChain.flowClassifiers();
// One port chain can have multiple flow classifiers.
......@@ -375,6 +458,67 @@ public class SfcManager implements SfcService {
return portChainId;
}
/**
* Find the load balanced path set it to port chain for the given five tuple.
*
* @param portChainId port chain id
* @param fiveTuple five tuple info
* @return load balance id
*/
private LoadBalanceId loadBalanceSfc(PortChainId portChainId, FiveTuple fiveTuple) {
// Get the port chain
PortChain portChain = portChainService.getPortChain(portChainId);
List<PortPairId> loadBalancePath = Lists.newArrayList();
LoadBalanceId id;
int paths = portChain.getLoadBalancePathSize();
if (paths >= MAX_LOAD_BALANCE_ID) {
log.info("Max limit reached for load balance paths. "
+ "Reusing the created path for port chain {} with five tuple {}",
portChainId, fiveTuple);
id = LoadBalanceId.of((byte) ((paths + 1) % MAX_LOAD_BALANCE_ID));
portChain.addLoadBalancePath(fiveTuple, id, portChain.getLoadBalancePath(id));
}
// Get the list of port pair groups from port chain
Iterable<PortPairGroupId> portPairGroups = portChain.portPairGroups();
for (final PortPairGroupId portPairGroupId : portPairGroups) {
PortPairGroup portPairGroup = portPairGroupService.getPortPairGroup(portPairGroupId);
// Get the list of port pair ids from port pair group.
Iterable<PortPairId> portPairs = portPairGroup.portPairs();
int minLoad = 0xFFF;
PortPairId minLoadPortPairId = null;
for (final PortPairId portPairId : portPairs) {
int load = portPairGroup.getLoad(portPairId);
if (load == 0) {
minLoadPortPairId = portPairId;
break;
} else {
// Check the port pair which has min load.
if (load < minLoad) {
minLoad = load;
minLoadPortPairId = portPairId;
}
}
}
if (minLoadPortPairId != null) {
loadBalancePath.add(minLoadPortPairId);
portPairGroup.addLoad(minLoadPortPairId);
}
}
// Check if the path already exists, if not create a new id
Optional<LoadBalanceId> output = portChain.matchPath(loadBalancePath);
if (output.isPresent()) {
id = output.get();
} else {
id = LoadBalanceId.of((byte) (paths + 1));
}
portChain.addLoadBalancePath(fiveTuple, id, loadBalancePath);
return id;
}
/**
* Get the tenant id for the given mac address.
......@@ -443,9 +587,59 @@ public class SfcManager implements SfcService {
return;
}
// TODO
// Once the 5 tuple and port chain are identified, give this input for load balancing
LoadBalanceId id = loadBalanceSfc(portChainId, fiveTuple);
// Get nsh service path index
NshServicePathId nshSpi;
PortChain portChain = portChainService.getPortChain(portChainId);
if (nshSpiPortChainMap.containsKey(portChain.portChainId())) {
int nshSpiId = nshSpiPortChainMap.get(portChain.portChainId());
nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
} else {
int nshSpiId = getNextNshSpi();
if (nshSpiId > MAX_NSH_SPI_ID) {
log.error("Reached max limit of service path index."
+ "Failed to install SFC for port chain {}", portChain.portChainId());
return;
}
nshSpi = NshServicePathId.of(getNshServicePathId(id, nshSpiId));
nshSpiPortChainMap.put(portChain.portChainId(), new Integer(nshSpiId));
}
// download the required flow rules for classifier and forwarding
// resend the packet back to classifier
// install in OVS.
ConnectPoint connectPoint = flowClassifierInstaller.installLoadBalancedFlowClassifier(portChain,
fiveTuple, nshSpi);
serviceFunctionForwarder.installLoadBalancedForwardingRule(portChain.getLoadBalancePath(fiveTuple),
nshSpi);
sendPacket(context, connectPoint);
}
/**
* Send packet back to classifier.
*
* @param context packet context
* @param connectPoint connect point of first service function
*/
private void sendPacket(PacketContext context, ConnectPoint connectPoint) {
TrafficTreatment treatment = DefaultTrafficTreatment.builder().setOutput(connectPoint.port()).build();
OutboundPacket packet = new DefaultOutboundPacket(connectPoint.deviceId(), treatment,
context.inPacket().unparsed());
packetService.emit(packet);
log.trace("Sending packet: {}", packet);
}
}
/**
* Encapsulate 5 bit load balance id to nsh spi.
*
* @param id load balance identifier
* @param nshSpiId nsh service path index
* @return updated service path index
*/
protected int getNshServicePathId(LoadBalanceId id, int nshSpiId) {
int nshSpiNew = nshSpiId << 5;
nshSpiNew = nshSpiNew | id.loadBalanceId();
return nshSpiNew;
}
}
......
......@@ -142,6 +142,14 @@ public final class DefaultPortChain implements PortChain {
}
@Override
public int getLoadBalancePathSize() {
if (sfcLoadBalanceIdMap.isEmpty()) {
return 0;
}
return sfcLoadBalanceIdMap.size();
}
@Override
public Optional<LoadBalanceId> matchPath(List<PortPairId> path) {
LoadBalanceId id = null;
......
......@@ -113,6 +113,13 @@ public interface PortChain {
List<PortPairId> getLoadBalancePath(FiveTuple fiveTuple);
/**
* Get the no of load balance paths created.
*
* @return size of load balanced paths
*/
int getLoadBalancePathSize();
/**
* Match the given path with existing load balanced paths.
*
* @param path load balanced path
......
......@@ -142,38 +142,37 @@ public class PortChainResourceTest extends VtnResourceTest {
@Override
public void addLoadBalancePath(FiveTuple fiveTuple, LoadBalanceId id, List<PortPairId> path) {
// TODO Auto-generated method stub
}
@Override
public LoadBalanceId getLoadBalanceId(FiveTuple fiveTuple) {
// TODO Auto-generated method stub
return null;
}
@Override
public Set<FiveTuple> getLoadBalanceIdMapKeys() {
// TODO Auto-generated method stub
return null;
}
@Override
public List<PortPairId> getLoadBalancePath(LoadBalanceId id) {
// TODO Auto-generated method stub
return null;
}
@Override
public List<PortPairId> getLoadBalancePath(FiveTuple fiveTuple) {
// TODO Auto-generated method stub
return null;
}
@Override
public Optional<LoadBalanceId> matchPath(List<PortPairId> path) {
// TODO Auto-generated method stub
return null;
}
@Override
public int getLoadBalancePathSize() {
return 0;
}
}
/**
......