ssyoon90
Committed by Gerrit Code Review

[Emu] openTAM: FlowStatisticManager, DistributedFlowStatisticStore, get-flow-sta…

…ts CLI Implementation and NewAdaptiveFlowStatsCollector update and typo

 - GetFlowStatistics.java
   .Fixed function name typo: immediateLoad()
 - SummaryFlowEntryWithLoad.java
   .Added javadoc
 - TypedFlowEntryWithLoad.java
   .Added javadoc,
   .and replace checknotnull and throw NullPointerException in typedPollInterval() at line 104

Change-Id: I23d2eaf234d0affeb5f927275148d9165c66c774
......@@ -25,6 +25,8 @@ import org.onosproject.net.ElementId;
import org.onosproject.net.Port;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.group.Group;
import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
import org.onosproject.net.topology.TopologyCluster;
import java.util.Comparator;
......@@ -115,4 +117,12 @@ public final class Comparators {
public static final Comparator<Interface> INTERFACES_COMPARATOR = (intf1, intf2) ->
CONNECT_POINT_COMPARATOR.compare(intf1.connectPoint(), intf2.connectPoint());
public static final Comparator<TypedFlowEntryWithLoad> TYPEFLOWENTRY_WITHLOAD_COMPARATOR =
new Comparator<TypedFlowEntryWithLoad>() {
@Override
public int compare(TypedFlowEntryWithLoad fe1, TypedFlowEntryWithLoad fe2) {
long delta = fe1.load().rate() - fe2.load().rate();
return delta == 0 ? 0 : (delta > 0 ? -1 : +1);
}
};
}
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cli.net;
import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.apache.karaf.shell.commands.Option;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.statistic.FlowStatisticService;
import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
import java.util.List;
import java.util.Map;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.PortNumber.portNumber;
/**
* Fetches flow statistics with a flow type and instruction type.
*/
@Command(scope = "onos", name = "get-flow-stats",
description = "Fetches flow stats for a connection point with given flow type and instruction type")
public class GetFlowStatistics extends AbstractShellCommand {
@Argument(index = 0, name = "devicePort",
description = "Device[/Port] connectPoint Description",
required = true, multiValued = false)
String devicePort = null;
@Option(name = "-s", aliases = "--summary",
description = "Show flow stats summary",
required = false, multiValued = false)
boolean showSummary = true; // default summary
@Option(name = "-a", aliases = "--all",
description = "Show flow stats all",
required = false, multiValued = false)
boolean showAll = false;
@Option(name = "-t", aliases = "--topn",
description = "Show flow stats topn",
required = false, multiValued = false)
String showTopn = null;
@Option(name = "-f", aliases = "--flowType",
description = "Flow live type, It includes IMMEDIATE, SHORT, MID, LONG, UNKNOWN"
+ ", and is valid with -a or -t option only",
required = false, multiValued = false)
String flowLiveType = null;
@Option(name = "-i", aliases = "--instructionType",
description = "Flow instruction type, It includes DROP, OUTPUT, GROUP, L0MODIFICATION, L2MODIFICATION,"
+ " TABLE, L3MODIFICATION, METADATA"
+ ", and is valid with -a or -t option only",
required = false, multiValued = false)
String instructionType = null;
@Override
protected void execute() {
DeviceService deviceService = get(DeviceService.class);
FlowStatisticService flowStatsService = get(FlowStatisticService.class);
String deviceURI = getDeviceId(devicePort);
String portURI = getPortNumber(devicePort);
DeviceId ingressDeviceId = deviceId(deviceURI);
PortNumber ingressPortNumber;
if (portURI.length() == 0) {
ingressPortNumber = null;
} else {
ingressPortNumber = portNumber(portURI);
}
Device device = deviceService.getDevice(ingressDeviceId);
if (device == null) {
error("No such device %s", ingressDeviceId.uri());
return;
}
if (ingressPortNumber != null) {
Port port = deviceService.getPort(ingressDeviceId, ingressPortNumber);
if (port == null) {
error("No such port %s on device %s", portURI, ingressDeviceId.uri());
return;
}
}
if (flowLiveType != null) {
flowLiveType = flowLiveType.toUpperCase();
}
if (instructionType != null) {
instructionType = instructionType.toUpperCase();
}
// convert String to FlowLiveType and check validity
TypedStoredFlowEntry.FlowLiveType inLiveType;
if (flowLiveType == null) {
inLiveType = null;
} else {
inLiveType = getFlowLiveType(flowLiveType);
if (inLiveType == null) {
error("Invalid flow live type [%s] error", flowLiveType);
return;
}
}
// convert String to InstructionType and check validity
Instruction.Type inInstructionType;
if (instructionType == null) {
inInstructionType = null;
} else {
inInstructionType = getInstructionType(instructionType);
if (inInstructionType == null) {
error("Invalid instruction type [%s] error", instructionType);
return;
}
}
if (showTopn != null) {
int topn = Integer.parseInt(showTopn);
if (topn <= 0) {
topn = 100; //default value
} else if (topn > 1000) {
topn = 1000; //max value
}
// print show topn head line with type
print("deviceId=%s, show TOPN=%s flows, live type=%s, instruction type=%s",
deviceURI,
Integer.toString(topn),
flowLiveType == null ? "ALL" : flowLiveType,
instructionType == null ? "ALL" : instructionType);
if (ingressPortNumber == null) {
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> typedFlowLoadMap =
flowStatsService.loadTopnByType(device, inLiveType, inInstructionType, topn);
// print all ports topn flows load for a given device
for (ConnectPoint cp : typedFlowLoadMap.keySet()) {
printPortFlowsLoad(cp, typedFlowLoadMap.get(cp));
}
} else {
List<TypedFlowEntryWithLoad> typedFlowLoad =
flowStatsService.loadTopnByType(device, ingressPortNumber, inLiveType, inInstructionType, topn);
// print device/port topn flows load
ConnectPoint cp = new ConnectPoint(ingressDeviceId, ingressPortNumber);
printPortFlowsLoad(cp, typedFlowLoad);
}
} else if (showAll) { // is true?
// print show all head line with type
print("deviceId=%s, show ALL flows, live type=%s, instruction type=%s",
deviceURI,
flowLiveType == null ? "ALL" : flowLiveType,
instructionType == null ? "ALL" : instructionType);
if (ingressPortNumber == null) {
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> typedFlowLoadMap =
flowStatsService.loadAllByType(device, inLiveType, inInstructionType);
// print all ports all flows load for a given device
for (ConnectPoint cp : typedFlowLoadMap.keySet()) {
printPortFlowsLoad(cp, typedFlowLoadMap.get(cp));
}
} else {
List<TypedFlowEntryWithLoad> typedFlowLoad =
flowStatsService.loadAllByType(device, ingressPortNumber, inLiveType, inInstructionType);
// print device/port all flows load
ConnectPoint cp = new ConnectPoint(ingressDeviceId, ingressPortNumber);
printPortFlowsLoad(cp, typedFlowLoad);
}
} else { // if (showSummary == true) //always is true
// print show summary head line
print("deviceId=%s, show SUMMARY flows", deviceURI);
if (ingressPortNumber == null) {
Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryFlowLoadMap =
flowStatsService.loadSummary(device);
// print all ports flow load summary for a given device
for (ConnectPoint cp : summaryFlowLoadMap.keySet()) {
printPortSummaryLoad(cp, summaryFlowLoadMap.get(cp));
}
} else {
SummaryFlowEntryWithLoad summaryFlowLoad =
flowStatsService.loadSummary(device, ingressPortNumber);
// print device/port flow load summary
ConnectPoint cp = new ConnectPoint(ingressDeviceId, ingressPortNumber);
printPortSummaryLoad(cp, summaryFlowLoad);
}
}
}
/**
* Extracts the port number portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return port number as a string, empty string if the port is not found
*/
private String getPortNumber(String deviceString) {
if (deviceString == null) {
return "";
}
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return ""; // return when no port number
}
return deviceString.substring(slash + 1, deviceString.length());
}
/**
* Extracts the device ID portion of the ConnectPoint.
*
* @param deviceString string representing the device/port
* @return device ID string
*/
private String getDeviceId(String deviceString) {
if (deviceString == null) {
return "";
}
int slash = deviceString.indexOf('/');
if (slash <= 0) {
return deviceString; // return only included device ID
}
return deviceString.substring(0, slash);
}
/**
* converts string of flow live type to FloeLiveType enum.
*
* @param liveType string representing the flow live type
* @return TypedStoredFlowEntry.FlowLiveType
*/
private TypedStoredFlowEntry.FlowLiveType getFlowLiveType(String liveType) {
String liveTypeUC = liveType.toUpperCase();
if (liveTypeUC.equals("IMMEDIATE")) {
return TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW;
} else if (liveTypeUC.equals("SHORT")) {
return TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW;
} else if (liveTypeUC.equals("MID")) {
return TypedStoredFlowEntry.FlowLiveType.MID_FLOW;
} else if (liveTypeUC.equals("LONG")) {
return TypedStoredFlowEntry.FlowLiveType.LONG_FLOW;
} else if (liveTypeUC.equals("UNKNOWN")) {
return TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW;
} else {
return null; // flow live type error
}
}
/**
* converts string of instruction type to Instruction type enum.
*
* @param instType string representing the instruction type
* @return Instruction.Type
*/
private Instruction.Type getInstructionType(String instType) {
String instTypeUC = instType.toUpperCase();
if (instTypeUC.equals("DROP")) {
return Instruction.Type.DROP;
} else if (instTypeUC.equals("OUTPUT")) {
return Instruction.Type.OUTPUT;
} else if (instTypeUC.equals("GROUP")) {
return Instruction.Type.GROUP;
} else if (instTypeUC.equals("L0MODIFICATION")) {
return Instruction.Type.L0MODIFICATION;
} else if (instTypeUC.equals("L2MODIFICATION")) {
return Instruction.Type.L2MODIFICATION;
} else if (instTypeUC.equals("TABLE")) {
return Instruction.Type.TABLE;
} else if (instTypeUC.equals("L3MODIFICATION")) {
return Instruction.Type.L3MODIFICATION;
} else if (instTypeUC.equals("METADATA")) {
return Instruction.Type.METADATA;
} else {
return null; // instruction type error
}
}
private void printPortFlowsLoad(ConnectPoint cp, List<TypedFlowEntryWithLoad> typedFlowLoad) {
print(" deviceId/Port=%s/%s, %s flows", cp.elementId(), cp.port(), typedFlowLoad.size());
for (TypedFlowEntryWithLoad tfel: typedFlowLoad) {
TypedStoredFlowEntry tfe = tfel.typedStoredFlowEntry();
print(" flowId=%s, state=%s, liveType=%s, life=%s -> %s",
Long.toHexString(tfe.id().value()),
tfe.state(),
tfe.flowLiveType(),
tfe.life(),
tfel.load().isValid() ? tfel.load() : "Load{rate=0, NOT VALID}");
}
}
private void printPortSummaryLoad(ConnectPoint cp, SummaryFlowEntryWithLoad summaryFlowLoad) {
print(" deviceId/Port=%s/%s, Total=%s, Immediate=%s, Short=%s, Mid=%s, Long=%s, Unknown=%s",
cp.elementId(),
cp.port(),
summaryFlowLoad.totalLoad().isValid() ? summaryFlowLoad.totalLoad() : "Load{rate=0, NOT VALID}",
summaryFlowLoad.immediateLoad().isValid() ? summaryFlowLoad.immediateLoad() : "Load{rate=0, NOT VALID}",
summaryFlowLoad.shortLoad().isValid() ? summaryFlowLoad.shortLoad() : "Load{rate=0, NOT VALID}",
summaryFlowLoad.midLoad().isValid() ? summaryFlowLoad.midLoad() : "Load{rate=0, NOT VALID}",
summaryFlowLoad.longLoad().isValid() ? summaryFlowLoad.longLoad() : "Load{rate=0, NOT VALID}",
summaryFlowLoad.unknownLoad().isValid() ? summaryFlowLoad.unknownLoad() : "Load{rate=0, NOT VALID}");
}
}
......@@ -222,6 +222,12 @@
</completers>
</command>
<command>
<action class="org.onosproject.cli.net.GetFlowStatistics"/>
<completers>
<ref component-id="deviceIdCompleter"/>
</completers>
</command>
<command>
<action class="org.onosproject.cli.net.AddMultiPointToSinglePointIntentCommand"/>
<completers>
<ref component-id="connectPointCompleter"/>
......@@ -333,7 +339,6 @@
<command>
<action class="org.onosproject.cli.net.InterfacesListCommand"/>
</command>
<command>
<action class="org.onosproject.cli.net.GroupsListCommand"/>
</command>
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.statistic;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.instructions.Instruction;
import java.util.List;
import java.util.Map;
/**
* Service for obtaining individual flow statistic information about device and link in the system.
* Basic statistics are obtained from the StatisticService
*/
public interface FlowStatisticService {
/**
* Obtain the summary load list for the device with the given link.
*
* @param device the Device to query.
* @return map of summary flow entry load
*/
Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device);
/**
* Obtain the summary load for the device with the given link or port.
*
* @param device the Device to query.
* @param pNumber the port number to query.
* @return summary flow entry load
*/
SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber);
/**
* Obtain the set of the flow type and load list for the device with the given link.
*
* @param device the Device to query.
* @param liveType the FlowLiveType to filter, null means no filtering .
* @param instType the InstructionType to filter, null means no filtering.
* @return map of flow entry load
*/
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType);
/**
* Obtain the flow type and load list for the device with the given link or port.
*
* @param device the Device to query.
* @param pNumber the port number of the Device to query
* @param liveType the FlowLiveType to filter, null means no filtering .
* @param instType the InstructionType to filter, null means no filtering.
* @return list of flow entry load
*/
List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType);
/**
* Obtain the set of the flow type and load topn list for the device with the given link.
*
* @param device the Device to query.
* @param liveType the FlowLiveType to filter, null means no filtering .
* @param instType the InstructionType to filter, null means no filtering.
* @param topn the top number to filter, null means no filtering.
* @return map of flow entry load
*/
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn);
/**
* Obtain the flow type and load topn list for the device with the given link or port.
*
* @param device the Device to query.
* @param pNumber the port number of the Device to query
* @param liveType the FlowLiveType to filter, null means no filtering .
* @param instType the InstructionType to filter, null means no filtering.
* @return list of flow entry load
*/
List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.statistic;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import java.util.Set;
/**
* Flow Store to house the computed statistics.
*/
public interface FlowStatisticStore {
/**
* Remove entries associated with this rule.
*
* @param rule {@link org.onosproject.net.flow.FlowRule}
*/
void removeFlowStatistic(FlowRule rule);
/**
* Adds a flow stats observation for a flow rule. The previous flow will be removed.
*
* @param rule a {@link org.onosproject.net.flow.FlowEntry}
*/
void addFlowStatistic(FlowEntry rule);
/**
* Updates a stats observation for a flow rule. The old flow stats will be moved to previous stats.
*
* @param rule a {@link org.onosproject.net.flow.FlowEntry}
*/
void updateFlowStatistic(FlowEntry rule);
/**
* Fetches the current observed flow stats values.
*
* @param connectPoint the port to fetch information for
* @return set of current flow rules
*/
Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint);
/**
* Fetches the current observed flow stats values.
*
* @param connectPoint the port to fetch information for
* @return set of current values
*/
Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.statistic;
import org.onosproject.net.ConnectPoint;
/**
* Summary Load classified by flow live type.
*/
public class SummaryFlowEntryWithLoad {
private ConnectPoint cp;
private Load totalLoad;
private Load immediateLoad;
private Load shortLoad;
private Load midLoad;
private Load longLoad;
private Load unknownLoad;
/**
* Creates a new summary flow entry having load for the given connect point and total load.
*
* @param cp connect point
* @param totalLoad total load
*/
public SummaryFlowEntryWithLoad(ConnectPoint cp, Load totalLoad) {
this.cp = cp;
this.totalLoad = totalLoad;
this.immediateLoad = new DefaultLoad();
this.shortLoad = new DefaultLoad();
this.midLoad = new DefaultLoad();
this.longLoad = new DefaultLoad();
this.unknownLoad = new DefaultLoad();
}
/**
* Creates a new summary flow entry having load for the given connect point
* and total, immediate, short, mid, and long load.
*
* @param cp connect point
* @param totalLoad total load
* @param immediateLoad immediate load
* @param shortLoad short load
* @param midLoad mid load
* @param longLoad long load
*/
public SummaryFlowEntryWithLoad(ConnectPoint cp,
Load totalLoad, Load immediateLoad, Load shortLoad, Load midLoad, Load longLoad) {
this.cp = cp;
this.totalLoad = totalLoad;
this.immediateLoad = immediateLoad;
this.shortLoad = shortLoad;
this.midLoad = midLoad;
this.longLoad = longLoad;
this.unknownLoad = new DefaultLoad();
}
/**
* Creates a new summary flow entry having load for the given connect point
* and total, immediate, short, mid, long, and unknown load.
*
* @param cp connect point
* @param totalLoad total load
* @param immediateLoad immediate load
* @param shortLoad short load
* @param midLoad mid load
* @param longLoad long load
* @param unknownLoad long load
*/
public SummaryFlowEntryWithLoad(ConnectPoint cp,
Load totalLoad, Load immediateLoad,
Load shortLoad, Load midLoad, Load longLoad, Load unknownLoad) {
this.cp = cp;
this.totalLoad = totalLoad;
this.immediateLoad = immediateLoad;
this.shortLoad = shortLoad;
this.midLoad = midLoad;
this.longLoad = longLoad;
this.unknownLoad = unknownLoad;
}
/**
* Returns connect point.
*/
public ConnectPoint connectPoint() {
return cp;
}
/**
* Returns total load of connect point.
*/
public Load totalLoad() {
return totalLoad;
}
/**
* Returns immediate load of connect point.
*/
public Load immediateLoad() {
return immediateLoad;
}
/**
* Returns short load of connect point.
*/
public Load shortLoad() {
return shortLoad;
}
/**
* Returns mid load of connect point.
*/
public Load midLoad() {
return midLoad;
}
/**
* Returns long load of connect point.
*/
public Load longLoad() {
return longLoad;
}
/**
* Returns unknown load of connect point.
*/
public Load unknownLoad() {
return unknownLoad;
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.statistic;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.DefaultTypedFlowEntry;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Load of flow entry of flow live type.
*/
public class TypedFlowEntryWithLoad {
private ConnectPoint cp;
private TypedStoredFlowEntry tfe;
private Load load;
//TODO: make this variables class, and share with NewAdaptivceFlowStatsCollector class
private static final int CAL_AND_POLL_INTERVAL = 5; // means SHORT_POLL_INTERVAL
private static final int MID_POLL_INTERVAL = 10;
private static final int LONG_POLL_INTERVAL = 15;
public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe, Load load) {
this.cp = cp;
this.tfe = tfe;
this.load = load;
}
public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe) {
this.cp = cp;
this.tfe = tfe;
this.load = new DefaultLoad(tfe.bytes(), 0, typedPollInterval(tfe));
}
public TypedFlowEntryWithLoad(ConnectPoint cp, FlowEntry fe) {
this.cp = cp;
this.tfe = newTypedStoredFlowEntry(fe);
this.load = new DefaultLoad(fe.bytes(), 0, typedPollInterval(this.tfe));
}
public ConnectPoint connectPoint() {
return cp;
}
public TypedStoredFlowEntry typedStoredFlowEntry() {
return tfe;
}
public Load load() {
return load;
}
public void setLoad(Load load) {
this.load = load;
}
/**
* Returns short polling interval.
*/
public static int shortPollInterval() {
return CAL_AND_POLL_INTERVAL;
}
/**
* Returns mid polling interval.
*/
public static int midPollInterval() {
return MID_POLL_INTERVAL;
}
/**
* Returns long polling interval.
*/
public static int longPollInterval() {
return LONG_POLL_INTERVAL;
}
/**
* Returns average polling interval.
*/
public static int avgPollInterval() {
return (CAL_AND_POLL_INTERVAL + MID_POLL_INTERVAL + LONG_POLL_INTERVAL) / 3;
}
/**
* Returns current typed flow entry's polling interval.
*
* @param tfe typed flow entry
*/
public static long typedPollInterval(TypedStoredFlowEntry tfe) {
checkNotNull(tfe, "TypedStoredFlowEntry cannot be null");
switch (tfe.flowLiveType()) {
case LONG_FLOW:
return LONG_POLL_INTERVAL;
case MID_FLOW:
return MID_POLL_INTERVAL;
case SHORT_FLOW:
case IMMEDIATE_FLOW:
default:
return CAL_AND_POLL_INTERVAL;
}
}
/**
* Creates a new typed flow entry with the given flow entry fe.
*
* @param fe flow entry
*/
public static TypedStoredFlowEntry newTypedStoredFlowEntry(FlowEntry fe) {
if (fe == null) {
return null;
}
long life = fe.life();
if (life >= LONG_POLL_INTERVAL) {
return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.LONG_FLOW);
} else if (life >= MID_POLL_INTERVAL) {
return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.MID_FLOW);
} else if (life >= CAL_AND_POLL_INTERVAL) {
return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW);
} else if (life >= 0) {
return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW);
} else { // life < 0
return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW);
}
}
}
......@@ -52,6 +52,20 @@
<dependency>
<groupId>org.onosproject</groupId>
<version>${project.version}</version>
<artifactId>onos-cli</artifactId>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-cli</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
<artifactId>onos-core-common</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.net.statistic.impl;
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.cli.Comparators;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTypedFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.FlowRuleEvent;
import org.onosproject.net.flow.FlowRuleListener;
import org.onosproject.net.flow.FlowRuleService;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.statistic.DefaultLoad;
import org.onosproject.net.statistic.FlowStatisticService;
import org.onosproject.net.statistic.Load;
import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.security.AppPermission.Type.*;
/**
* Provides an implementation of the Flow Statistic Service.
*/
@Component(immediate = true, enabled = true)
@Service
public class FlowStatisticManager implements FlowStatisticService {
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowRuleService flowRuleService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FlowStatisticStore flowStatisticStore;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();
@Activate
public void activate() {
flowRuleService.addListener(frListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
flowRuleService.removeListener(frListener);
log.info("Stopped");
}
@Override
public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
checkPermission(STATISTIC_READ);
Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
if (device == null) {
return summaryLoad;
}
List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
for (Port port : ports) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
summaryLoad.put(cp, sfe);
}
return summaryLoad;
}
@Override
public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
checkPermission(STATISTIC_READ);
ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
return loadSummaryPortInternal(cp);
}
@Override
public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType) {
checkPermission(STATISTIC_READ);
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
if (device == null) {
return allLoad;
}
List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
for (Port port : ports) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);
allLoad.put(cp, tfel);
}
return allLoad;
}
@Override
public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType) {
checkPermission(STATISTIC_READ);
ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
return loadAllPortInternal(cp, liveType, instType);
}
@Override
public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn) {
checkPermission(STATISTIC_READ);
Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
if (device == null) {
return allLoad;
}
List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
for (Port port : ports) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);
allLoad.put(cp, tfel);
}
return allLoad;
}
@Override
public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn) {
checkPermission(STATISTIC_READ);
ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
return loadTopnPortInternal(cp, liveType, instType, topn);
}
private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
checkPermission(STATISTIC_READ);
Set<FlowEntry> currentStats;
Set<FlowEntry> previousStats;
TypedStatistics typedStatistics;
synchronized (flowStatisticStore) {
currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
if (currentStats == null) {
return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
}
previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
if (previousStats == null) {
return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
}
// copy to local flow entry
typedStatistics = new TypedStatistics(currentStats, previousStats);
// Check for validity of this stats data
checkLoadValidity(currentStats, previousStats);
}
// current and previous set is not empty!
Set<FlowEntry> currentSet = typedStatistics.current();
Set<FlowEntry> previousSet = typedStatistics.previous();
Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
TypedFlowEntryWithLoad.avgPollInterval());
Map<FlowRule, TypedStoredFlowEntry> currentMap;
Map<FlowRule, TypedStoredFlowEntry> previousMap;
currentMap = typedStatistics.currentImmediate();
previousMap = typedStatistics.previousImmediate();
Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
TypedFlowEntryWithLoad.shortPollInterval());
currentMap = typedStatistics.currentShort();
previousMap = typedStatistics.previousShort();
Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
TypedFlowEntryWithLoad.shortPollInterval());
currentMap = typedStatistics.currentMid();
previousMap = typedStatistics.previousMid();
Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
TypedFlowEntryWithLoad.midPollInterval());
currentMap = typedStatistics.currentLong();
previousMap = typedStatistics.previousLong();
Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
TypedFlowEntryWithLoad.longPollInterval());
currentMap = typedStatistics.currentUnknown();
previousMap = typedStatistics.previousUnknown();
Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
TypedFlowEntryWithLoad.avgPollInterval());
return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
}
private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType) {
checkPermission(STATISTIC_READ);
List<TypedFlowEntryWithLoad> retTFEL = new ArrayList<>();
Set<FlowEntry> currentStats;
Set<FlowEntry> previousStats;
TypedStatistics typedStatistics;
synchronized (flowStatisticStore) {
currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
if (currentStats == null) {
return retTFEL;
}
previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
if (previousStats == null) {
return retTFEL;
}
// copy to local flow entry set
typedStatistics = new TypedStatistics(currentStats, previousStats);
// Check for validity of this stats data
checkLoadValidity(currentStats, previousStats);
}
// current and previous set is not empty!
boolean isAllLiveType = (liveType == null ? true : false); // null is all live type
boolean isAllInstType = (instType == null ? true : false); // null is all inst type
Map<FlowRule, TypedStoredFlowEntry> currentMap;
Map<FlowRule, TypedStoredFlowEntry> previousMap;
if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {
currentMap = typedStatistics.currentImmediate();
previousMap = typedStatistics.previousImmediate();
List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
if (fel.size() > 0) {
retTFEL.addAll(fel);
}
}
if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {
currentMap = typedStatistics.currentShort();
previousMap = typedStatistics.previousShort();
List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
if (fel.size() > 0) {
retTFEL.addAll(fel);
}
}
if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {
currentMap = typedStatistics.currentMid();
previousMap = typedStatistics.previousMid();
List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());
if (fel.size() > 0) {
retTFEL.addAll(fel);
}
}
if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {
currentMap = typedStatistics.currentLong();
previousMap = typedStatistics.previousLong();
List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());
if (fel.size() > 0) {
retTFEL.addAll(fel);
}
}
if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {
currentMap = typedStatistics.currentUnknown();
previousMap = typedStatistics.previousUnknown();
List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());
if (fel.size() > 0) {
retTFEL.addAll(fel);
}
}
return retTFEL;
}
private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
Map<FlowRule, TypedStoredFlowEntry> currentMap,
Map<FlowRule, TypedStoredFlowEntry> previousMap,
boolean isAllInstType,
Instruction.Type instType,
int liveTypePollInterval) {
List<TypedFlowEntryWithLoad> fel = new ArrayList<>();
for (TypedStoredFlowEntry tfe : currentMap.values()) {
if (isAllInstType ||
tfe.treatment().allInstructions().stream().
filter(i -> i.type() == instType).
findAny().isPresent()) {
long currentBytes = tfe.bytes();
long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();
Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));
}
}
return fel;
}
private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
TypedStoredFlowEntry.FlowLiveType liveType,
Instruction.Type instType,
int topn) {
List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
// Sort with descending order of load
List<TypedFlowEntryWithLoad> tfel =
fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).
limit(topn).collect(Collectors.toList());
return tfel;
}
private long aggregateBytesSet(Set<FlowEntry> setFE) {
return setFE.stream().mapToLong(FlowEntry::bytes).sum();
}
private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {
return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
}
/**
* Internal data class holding two set of typed flow entries.
*/
private static class TypedStatistics {
private final ImmutableSet<FlowEntry> currentAll;
private final ImmutableSet<FlowEntry> previousAll;
private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();
private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();
public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
this.currentAll = ImmutableSet.copyOf(checkNotNull(current));
this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));
currentAll.forEach(fe -> {
TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
switch (tfe.flowLiveType()) {
case IMMEDIATE_FLOW:
currentImmediate.put(fe, tfe);
break;
case SHORT_FLOW:
currentShort.put(fe, tfe);
break;
case MID_FLOW:
currentMid.put(fe, tfe);
break;
case LONG_FLOW:
currentLong.put(fe, tfe);
break;
default:
currentUnknown.put(fe, tfe);
break;
}
});
previousAll.forEach(fe -> {
TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
switch (tfe.flowLiveType()) {
case IMMEDIATE_FLOW:
if (currentImmediate.containsKey(fe)) {
previousImmediate.put(fe, tfe);
} else if (currentShort.containsKey(fe)) {
previousShort.put(fe, tfe);
} else if (currentMid.containsKey(fe)) {
previousMid.put(fe, tfe);
} else if (currentLong.containsKey(fe)) {
previousLong.put(fe, tfe);
} else {
previousUnknown.put(fe, tfe);
}
break;
case SHORT_FLOW:
if (currentShort.containsKey(fe)) {
previousShort.put(fe, tfe);
} else if (currentMid.containsKey(fe)) {
previousMid.put(fe, tfe);
} else if (currentLong.containsKey(fe)) {
previousLong.put(fe, tfe);
} else {
previousUnknown.put(fe, tfe);
}
break;
case MID_FLOW:
if (currentMid.containsKey(fe)) {
previousMid.put(fe, tfe);
} else if (currentLong.containsKey(fe)) {
previousLong.put(fe, tfe);
} else {
previousUnknown.put(fe, tfe);
}
break;
case LONG_FLOW:
if (currentLong.containsKey(fe)) {
previousLong.put(fe, tfe);
} else {
previousUnknown.put(fe, tfe);
}
break;
default:
previousUnknown.put(fe, tfe);
break;
}
});
}
/**
* Returns flow entries as the current value.
*
* @return flow entries as the current value
*/
public ImmutableSet<FlowEntry> current() {
return currentAll;
}
/**
* Returns flow entries as the previous value.
*
* @return flow entries as the previous value
*/
public ImmutableSet<FlowEntry> previous() {
return previousAll;
}
public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {
return currentImmediate;
}
public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {
return previousImmediate;
}
public Map<FlowRule, TypedStoredFlowEntry> currentShort() {
return currentShort;
}
public Map<FlowRule, TypedStoredFlowEntry> previousShort() {
return previousShort;
}
public Map<FlowRule, TypedStoredFlowEntry> currentMid() {
return currentMid;
}
public Map<FlowRule, TypedStoredFlowEntry> previousMid() {
return previousMid;
}
public Map<FlowRule, TypedStoredFlowEntry> currentLong() {
return currentLong;
}
public Map<FlowRule, TypedStoredFlowEntry> previousLong() {
return previousLong;
}
public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {
return currentUnknown;
}
public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {
return previousUnknown;
}
/**
* Validates values are not empty.
*
* @return false if either of the sets is empty. Otherwise, true.
*/
public boolean isValid() {
return !(currentAll.isEmpty() || previousAll.isEmpty());
}
@Override
public int hashCode() {
return Objects.hash(currentAll, previousAll);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof TypedStatistics)) {
return false;
}
final TypedStatistics other = (TypedStatistics) obj;
return Objects.equals(this.currentAll, other.currentAll) &&
Objects.equals(this.previousAll, other.previousAll);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("current", currentAll)
.add("previous", previousAll)
.toString();
}
}
private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
current.stream().forEach(c -> {
FlowEntry f = previous.stream().filter(p -> c.equals(p)).
findAny().orElse(null);
if (f != null && c.bytes() < f.bytes()) {
log.debug("FlowStatisticManager:checkLoadValidity():" +
"Error: " + c + " :Previous bytes=" + f.bytes() +
" is larger than current bytes=" + c.bytes() + " !!!");
}
});
}
/**
* Creates a predicate that checks the instruction type of a flow entry is the same as
* the specified instruction type.
*
* @param instType instruction type to be checked
* @return predicate
*/
private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
return new Predicate<FlowEntry>() {
@Override
public boolean apply(FlowEntry flowEntry) {
List<Instruction> allInstructions = flowEntry.treatment().allInstructions();
return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
}
};
}
/**
* Internal flow rule event listener for FlowStatisticManager.
*/
private class InternalFlowRuleStatsListener implements FlowRuleListener {
@Override
public void event(FlowRuleEvent event) {
FlowRule rule = event.subject();
switch (event.type()) {
case RULE_ADDED:
if (rule instanceof FlowEntry) {
flowStatisticStore.addFlowStatistic((FlowEntry) rule);
}
break;
case RULE_UPDATED:
flowStatisticStore.updateFlowStatistic((FlowEntry) rule);
break;
case RULE_ADD_REQUESTED:
break;
case RULE_REMOVE_REQUESTED:
break;
case RULE_REMOVED:
flowStatisticStore.removeFlowStatistic(rule);
break;
default:
log.warn("Unknown flow rule event {}", event);
}
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.statistic.impl;
import com.google.common.base.Objects;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.statistic.FlowStatisticStore;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Maintains flow statistics using RPC calls to collect stats from remote instances
* on demand.
*/
@Component(immediate = true)
@Service
public class DistributedFlowStatisticStore implements FlowStatisticStore {
private final Logger log = getLogger(getClass());
// TODO: Make configurable.
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
private Map<ConnectPoint, Set<FlowEntry>> previous =
new ConcurrentHashMap<>();
private Map<ConnectPoint, Set<FlowEntry>> current =
new ConcurrentHashMap<>();
protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
// register this store specific classes here
.build();
}
};
private NodeId local;
private ExecutorService messageHandlingExecutor;
private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
@Activate
public void activate() {
local = clusterService.getLocalNode().id();
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/statistic", "message-handlers"));
clusterCommunicator.addSubscriber(
GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(
GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
messageHandlingExecutor);
log.info("Started");
}
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(GET_PREVIOUS);
clusterCommunicator.removeSubscriber(GET_CURRENT);
messageHandlingExecutor.shutdown();
log.info("Stopped");
}
@Override
public synchronized void removeFlowStatistic(FlowRule rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {
return;
}
// remove this rule if present from current map
current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
// remove this on if present from previous map
previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
}
@Override
public synchronized void addFlowStatistic(FlowEntry rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {
return;
}
// create one if absent and add this rule
current.putIfAbsent(cp, new HashSet<>());
current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
// remove previous one if present
previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
}
public synchronized void updateFlowStatistic(FlowEntry rule) {
ConnectPoint cp = buildConnectPoint(rule);
if (cp == null) {
return;
}
Set<FlowEntry> curr = current.get(cp);
if (curr == null) {
addFlowStatistic(rule);
} else {
FlowEntry f = curr.stream().filter(c -> rule.equals(c)).
findAny().orElse(null);
if (rule.bytes() < f.bytes()) {
log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
" Invalid Flow Update! Will be removed!!" +
" curr flowId=" + Long.toHexString(rule.id().value()) +
", prev flowId=" + Long.toHexString(f.id().value()) +
", curr bytes=" + rule.bytes() + ", prev bytes=" + f.bytes() +
", curr life=" + rule.life() + ", prev life=" + f.life() +
", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.lastSeen());
// something is wrong! invalid flow entry, so delete it
removeFlowStatistic(rule);
return;
}
Set<FlowEntry> prev = previous.get(cp);
if (prev == null) {
prev = new HashSet<>();
previous.put(cp, prev);
}
// previous one is exist
if (f != null) {
// remove old one and add new one
prev.remove(rule);
if (!prev.add(f)) {
log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
" flowId={}, add failed into previous.",
Long.toHexString(rule.id().value()));
}
}
// remove old one and add new one
curr.remove(rule);
if (!curr.add(rule)) {
log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
" flowId={}, add failed into current.",
Long.toHexString(rule.id().value()));
}
}
}
@Override
public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
final DeviceId deviceId = connectPoint.deviceId();
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("No master for {}", deviceId);
return Collections.emptySet();
}
if (Objects.equal(local, master)) {
return getCurrentStatisticInternal(connectPoint);
} else {
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
connectPoint,
GET_CURRENT,
SERIALIZER::encode,
SERIALIZER::decode,
master),
STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
Collections.emptySet());
}
}
private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
return current.get(connectPoint);
}
@Override
public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
final DeviceId deviceId = connectPoint.deviceId();
NodeId master = mastershipService.getMasterFor(deviceId);
if (master == null) {
log.warn("No master for {}", deviceId);
return Collections.emptySet();
}
if (Objects.equal(local, master)) {
return getPreviousStatisticInternal(connectPoint);
} else {
return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
connectPoint,
GET_PREVIOUS,
SERIALIZER::encode,
SERIALIZER::decode,
master),
STATISTIC_STORE_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS,
Collections.emptySet());
}
}
private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
return previous.get(connectPoint);
}
private ConnectPoint buildConnectPoint(FlowRule rule) {
PortNumber port = getOutput(rule);
if (port == null) {
return null;
}
ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
return cp;
}
private PortNumber getOutput(FlowRule rule) {
for (Instruction i : rule.treatment().allInstructions()) {
if (i.type() == Instruction.Type.OUTPUT) {
Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
return out.port();
}
if (i.type() == Instruction.Type.DROP) {
return PortNumber.P0;
}
}
return null;
}
}
\ No newline at end of file
......@@ -717,25 +717,9 @@ public class NewAdaptiveFlowStatsCollector {
long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
// For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
// fe.life() unit is SECOND!
long liveTime = fe.life() + fromLastSeen;
// check flow timeout
if (fe.timeout() > calAndPollInterval && fromLastSeen > fe.timeout()) {
if (!fe.isPermanent()) {
log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
+ ", liveType=" + fe.flowLiveType()
+ ", liveTime=" + liveTime
+ ", life=" + fe.life()
+ ", fromLastSeen=" + fromLastSeen
+ ", timeout=" + fe.timeout()
+ ", isPermanent=" + fe.isPermanent()
+ " AdaptiveStats collection thread for {}",
sw.getStringId());
return false;
}
}
switch (fe.flowLiveType()) {
case IMMEDIATE_FLOW:
......