Jian Li
Committed by Gerrit Code Review

[ONOS-3504] Initial implementation of control message aggregation

This commit implements control message collection feature in
OpenFlow message provider.

Change-Id: I2a3ed2e5edbe1f39b503bb74a10259026b806513
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2016 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<app name="org.onosproject.openflow-message" origin="ON.Lab" version="${project.version}"
category="default" url="http://onosproject.org"
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features"
features="${project.artifactId}">
<description>${project.description}</description>
<artifact>mvn:${project.groupId}/onos-of-provider-message/${project.version}</artifact>
</app>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
~ Copyright 2016 Open Networking Laboratory
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<features xmlns="http://karaf.apache.org/xmlns/features/v1.2.0" name="${project.artifactId}-${project.version}">
<feature name="${project.artifactId}" version="${project.version}"
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:${project.groupId}/onos-of-provider-message/${project.version}</bundle>
</feature>
</features>
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.provider.of.message.impl;
import com.codahale.metrics.Meter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.onlab.metrics.MetricsComponent;
import org.onlab.metrics.MetricsFeature;
import org.onlab.metrics.MetricsService;
import org.onosproject.cpman.ControlMessage;
import org.onosproject.cpman.DefaultControlMessage;
import org.onosproject.cpman.message.ControlMessageProviderService;
import org.onosproject.net.DeviceId;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import static org.onosproject.provider.of.message.impl.OpenFlowControlMessageMapper.lookupControlMessageType;
/**
* Collects the OpenFlow messages and aggregates using MetricsService.
*/
public class OpenFlowControlMessageAggregator implements Runnable {
private static final Set<OFType> OF_TYPE_SET =
ImmutableSet.of(OFType.PACKET_IN, OFType.PACKET_OUT, OFType.FLOW_MOD,
OFType.FLOW_REMOVED, OFType.STATS_REQUEST, OFType.STATS_REPLY);
private final Map<OFType, Meter> rateMeterMap = Maps.newHashMap();
private final Map<OFType, Meter> countMeterMap = Maps.newHashMap();
private final DeviceId deviceId;
private final ControlMessageProviderService providerService;
private static final String RATE_NAME = "rate";
private static final String COUNT_NAME = "count";
private Collection<ControlMessage> controlMessages = new ArrayList<>();
// TODO: this needs to be configurable
private static final int EXECUTE_PERIOD_IN_SECOND = 60;
/**
* Generates an OpenFlow message aggregator instance.
* The instance is for aggregating a specific OpenFlow message
* type of an OpenFlow switch.
*
* @param metricsService metrics service reference object
* @param providerService control message provider service reference object
* @param deviceId device identification
*/
public OpenFlowControlMessageAggregator(MetricsService metricsService,
ControlMessageProviderService providerService,
DeviceId deviceId) {
MetricsComponent mc = metricsService.registerComponent(deviceId.toString());
OF_TYPE_SET.forEach(type -> {
MetricsFeature metricsFeature = mc.registerFeature(type.toString());
Meter rateMeter = metricsService.createMeter(mc, metricsFeature, RATE_NAME);
Meter countMeter = metricsService.createMeter(mc, metricsFeature, COUNT_NAME);
rateMeterMap.put(type, rateMeter);
countMeterMap.put(type, countMeter);
});
this.deviceId = deviceId;
this.providerService = providerService;
}
/**
* Increments the meter rate by n, and the meter count by 1.
*
* @param msg OpenFlow message
*/
public void increment(OFMessage msg) {
rateMeterMap.get(msg.getType()).mark(msg.toString().length());
countMeterMap.get(msg.getType()).mark(1);
}
@Override
public void run() {
// update 1 minute statistic information of all control messages
OF_TYPE_SET.forEach(type -> controlMessages.add(
new DefaultControlMessage(lookupControlMessageType(type),
getLoad(type), getRate(type), getCount(type),
System.currentTimeMillis())));
providerService.updateStatsInfo(deviceId,
Collections.unmodifiableCollection(controlMessages));
}
/**
* Returns the average load value.
*
* @param type OpenFlow message type
* @return load value
*/
private long getLoad(OFType type) {
return (long) rateMeterMap.get(type).getOneMinuteRate() /
(long) countMeterMap.get(type).getOneMinuteRate();
}
/**
* Returns the average meter rate within recent 1 minute.
*
* @param type OpenFlow message type
* @return rate value
*/
private long getRate(OFType type) {
return (long) rateMeterMap.get(type).getOneMinuteRate();
}
/**
* Returns the average meter count within recent 1 minute.
*
* @param type OpenFlow message type
* @return count value
*/
private long getCount(OFType type) {
return (long) countMeterMap.get(type).getOneMinuteRate() *
EXECUTE_PERIOD_IN_SECOND;
}
}
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.provider.of.message.impl;
import com.google.common.collect.BiMap;
import com.google.common.collect.EnumHashBiMap;
import org.onosproject.cpman.ControlMessage;
import org.projectfloodlight.openflow.protocol.OFType;
import static org.projectfloodlight.openflow.protocol.OFType.*;
import static org.onosproject.cpman.ControlMessage.Type.*;
/**
* Collection of helper methods to convert protocol agnostic control message to
* messages used in OpenFlow specification.
*/
public final class OpenFlowControlMessageMapper {
// prohibit instantiation
private OpenFlowControlMessageMapper() {
}
private static final BiMap<OFType, ControlMessage.Type> MESSAGE_TYPE =
EnumHashBiMap.create(OFType.class);
static {
// key is OpenFlow specific OFType
// value is protocol agnostic ControlMessage.Type
MESSAGE_TYPE.put(PACKET_IN, INCOMING_PACKET);
MESSAGE_TYPE.put(PACKET_OUT, OUTGOING_PACKET);
MESSAGE_TYPE.put(FLOW_MOD, FLOW_MOD_PACKET);
MESSAGE_TYPE.put(FLOW_REMOVED, FLOW_REMOVED_PACKET);
MESSAGE_TYPE.put(STATS_REQUEST, REQUEST_PACKET);
MESSAGE_TYPE.put(STATS_REPLY, REPLY_PACKET);
}
/**
* Looks up the specified input value to the corresponding value with the specified map.
*
* @param map bidirectional mapping
* @param input input type
* @param cls class of output value
* @param <I> type of input value
* @param <O> type of output value
* @return the corresponding value stored in the specified map
*/
private static <I, O> O lookup(BiMap<I, O> map, I input, Class<O> cls) {
if (!map.containsKey(input)) {
throw new RuntimeException(
String.format("No mapping found for %s when converting to %s",
input, cls.getName()));
}
return map.get(input);
}
/**
* Looks up the corresponding {@link ControlMessage.Type} instance
* from the specified OFType value for OpenFlow message type.
*
* @param type OpenFlow message type
* @return protocol agnostic control message type
*/
public static ControlMessage.Type lookupControlMessageType(OFType type) {
return lookup(MESSAGE_TYPE, type, ControlMessage.Type.class);
}
/**
* Looks up the corresponding {@link OFType} instance from the specified
* ControlMetricType value.
*
* @param type control message type
* @return OpenFlow specific message type
*/
public static OFType lookupOFType(ControlMessage.Type type) {
return lookup(MESSAGE_TYPE.inverse(), type, OFType.class);
}
}
......@@ -15,19 +15,39 @@
*/
package org.onosproject.provider.of.message.impl;
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.onlab.metrics.MetricsService;
import org.onosproject.cpman.message.ControlMessageProvider;
import org.onosproject.cpman.message.ControlMessageProviderRegistry;
import org.onosproject.cpman.message.ControlMessageProviderService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.openflow.controller.Dpid;
import org.onosproject.openflow.controller.OpenFlowController;
import org.onosproject.openflow.controller.OpenFlowEventListener;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.OpenFlowSwitchListener;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFMessage;
import org.projectfloodlight.openflow.protocol.OFPortStatus;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.openflow.controller.Dpid.uri;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onlab.util.Tools.groupedThreads;
/**
* Provider which uses an OpenFlow controller to collect control message.
......@@ -36,13 +56,34 @@ import static org.slf4j.LoggerFactory.getLogger;
public class OpenFlowControlMessageProvider extends AbstractProvider
implements ControlMessageProvider {
private static final Logger LOG = getLogger(OpenFlowControlMessageProvider.class);
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ControlMessageProviderRegistry providerRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenFlowController controller;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MetricsService metricsService;
private ControlMessageProviderService providerService;
private final InternalDeviceProvider listener = new InternalDeviceProvider();
private final InternalIncomingMessageProvider inMsgListener =
new InternalIncomingMessageProvider();
private final InternalOutgoingMessageProvider outMsgListener =
new InternalOutgoingMessageProvider();
private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap();
private ScheduledExecutorService executor;
private static final int AGGR_INIT_DELAY = 1;
private static final int AGGR_PERIOD = 1;
private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES;
private HashMap<Dpid, ScheduledFuture<?>> executorResults = Maps.newHashMap();
/**
* Creates a provider with the supplier identifier.
*/
......@@ -53,13 +94,131 @@ public class OpenFlowControlMessageProvider extends AbstractProvider
@Activate
protected void activate() {
providerService = providerRegistry.register(this);
LOG.info("Started");
// listens all OpenFlow device related events
controller.addListener(listener);
// listens all OpenFlow incoming message events
controller.addEventListener(inMsgListener);
controller.monitorAllEvents(true);
// listens all OpenFlow outgoing message events
controller.getSwitches().forEach(sw -> sw.addEventListener(outMsgListener));
executor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/provider", "aggregator"));
connectInitialDevices();
log.info("Started");
}
@Deactivate
protected void deactivate() {
controller.removeListener(listener);
providerRegistry.unregister(this);
providerService = null;
LOG.info("Stopped");
// stops listening all OpenFlow incoming message events
controller.monitorAllEvents(false);
controller.removeEventListener(inMsgListener);
// stops listening all OpenFlow outgoing message events
controller.getSwitches().forEach(sw -> sw.removeEventListener(outMsgListener));
log.info("Stopped");
}
private void connectInitialDevices() {
for (OpenFlowSwitch sw: controller.getSwitches()) {
try {
listener.switchAdded(new Dpid(sw.getId()));
} catch (Exception e) {
log.warn("Failed initially adding {} : {}", sw.getStringId(), e.getMessage());
log.debug("Error details:", e);
}
}
}
/**
* A listener for OpenFlow switch event.
*/
private class InternalDeviceProvider implements OpenFlowSwitchListener {
@Override
public void switchAdded(Dpid dpid) {
if (providerService == null) {
return;
}
OpenFlowSwitch sw = controller.getSwitch(dpid);
if (sw != null) {
// start to monitor the outgoing control messages
sw.addEventListener(outMsgListener);
}
DeviceId deviceId = deviceId(uri(dpid));
OpenFlowControlMessageAggregator ofcma =
new OpenFlowControlMessageAggregator(metricsService,
providerService, deviceId);
ScheduledFuture result = executor.scheduleAtFixedRate(ofcma,
AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT);
aggregators.put(dpid, ofcma);
executorResults.put(dpid, result);
}
@Override
public void switchRemoved(Dpid dpid) {
if (providerService == null) {
return;
}
OpenFlowSwitch sw = controller.getSwitch(dpid);
if (sw != null) {
// stop monitoring the outgoing control messages
sw.removeEventListener(outMsgListener);
}
// removes the aggregator when switch is removed
// this also stops the aggregator from running
OpenFlowControlMessageAggregator aggregator = aggregators.remove(dpid);
if (aggregator != null) {
executorResults.get(dpid).cancel(true);
executorResults.remove(dpid);
}
}
@Override
public void switchChanged(Dpid dpid) {
}
@Override
public void portChanged(Dpid dpid, OFPortStatus status) {
}
@Override
public void receivedRoleReply(Dpid dpid, RoleState requested, RoleState response) {
}
}
/**
* A listener for incoming OpenFlow messages.
*/
private class InternalIncomingMessageProvider implements OpenFlowEventListener {
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
aggregators.get(dpid).increment(msg);
}
}
/**
* A listener for outgoing OpenFlow messages.
*/
private class InternalOutgoingMessageProvider implements OpenFlowEventListener {
@Override
public void handleMessage(Dpid dpid, OFMessage msg) {
aggregators.get(dpid).increment(msg);
}
}
}
......