Jian Li
Committed by Gerrit Code Review

[ONOS-3538] Implement control metrics distribution logic

- Revise getLoad method to getLocalLoad
- Add new getRemoteLoad methods
- Add the capability to query remote control load

CLI and REST will be implemented in a separated patch

Change-Id: I62d4f4ab891d6d3e95cedd5af9e4ab71949c05ec
......@@ -20,6 +20,7 @@ import org.onosproject.net.DeviceId;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static org.onosproject.cpman.ControlResource.*;
......@@ -31,10 +32,10 @@ public interface ControlPlaneMonitorService {
/**
* Adds a new control metric value with a certain update interval.
*
* @param controlMetric control plane metric (e.g., control
* message rate, cpu, memory, etc.)
* @param updateIntervalInMinutes value update interval (in minute)
* @param deviceId device identifier
* @param controlMetric control plane metric (e.g., control
* message rate, cpu, memory, etc.)
* @param updateIntervalInMinutes value update interval (in minute)
* @param deviceId device identifier
*/
void updateMetric(ControlMetric controlMetric, int updateIntervalInMinutes,
Optional<DeviceId> deviceId);
......@@ -42,38 +43,59 @@ public interface ControlPlaneMonitorService {
/**
* Adds a new control metric value with a certain update interval.
*
* @param controlMetric control plane metric (e.g., disk and
* network metrics)
* @param updateIntervalInMinutes value update interval (in minute)
* @param resourceName resource name
* @param controlMetric control plane metric (e.g., disk and
* network metrics)
* @param updateIntervalInMinutes value update interval (in minute)
* @param resourceName resource name
*/
void updateMetric(ControlMetric controlMetric, int updateIntervalInMinutes,
String resourceName);
/**
* Obtains the control plane load of a specific device.
* Obtains local control plane load of a specific device.
* The metrics range from control messages and system metrics
* (e.g., CPU and memory info)
*
* @param nodeId node identifier
* @param type control metric type
* @param deviceId device identifier
* @param type control metric type
* @param deviceId device identifier
* @return control plane load
*/
ControlLoad getLoad(NodeId nodeId, ControlMetricType type,
Optional<DeviceId> deviceId);
ControlLoad getLocalLoad(ControlMetricType type, Optional<DeviceId> deviceId);
/**
* Obtains the control plane load of a specific device.
* Obtains local control plane load of a specific resource.
* The metrics range from I/O device metrics
* (e.g., disk and network interface)
*
* @param nodeId node identifier
* @param type control metric type
* @param resourceName resource name
* @param type control metric type
* @param resourceName resource name
* @return control plane load
*/
ControlLoad getLoad(NodeId nodeId, ControlMetricType type, String resourceName);
ControlLoad getLocalLoad(ControlMetricType type, String resourceName);
/**
* Obtains remote control plane load of a specific device.
*
* @param nodeId node identifier
* @param type control metric type
* @param deviceId device identifier
* @return completable future object of control load
*/
CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
ControlMetricType type,
Optional<DeviceId> deviceId);
/**
* Obtains remote control plane load of a specific resource.
*
* @param nodeId node identifier
* @param type control metric type
* @param resourceName resource name
* @return completable future object of control load
*/
CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
ControlMetricType type,
String resourceName);
/**
* Obtains a list of names of available resources.
......
......@@ -101,11 +101,11 @@ public class ControlMetricsStatsListCommand extends AbstractShellCommand {
private void printMetricsStats(ControlPlaneMonitorService service, NodeId nodeId,
Set<ControlMetricType> typeSet, String name, DeviceId did) {
if (name == null && did == null) {
typeSet.forEach(s -> print(s, service.getLoad(nodeId, s, Optional.ofNullable(null))));
typeSet.forEach(s -> print(s, service.getLocalLoad(s, Optional.ofNullable(null))));
} else if (name == null && did != null) {
typeSet.forEach(s -> print(s, service.getLoad(nodeId, s, Optional.of(did))));
typeSet.forEach(s -> print(s, service.getLocalLoad(s, Optional.of(did))));
} else if (name != null && did == null) {
typeSet.forEach(s -> print(s, service.getLoad(nodeId, s, name)));
typeSet.forEach(s -> print(s, service.getLocalLoad(s, name)));
}
}
......
/*
* Copyright 2016 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cpman.impl;
import com.google.common.base.MoreObjects;
import org.onosproject.cpman.ControlMetricType;
import org.onosproject.net.DeviceId;
import java.util.Objects;
import java.util.Optional;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* A container class that is used to request control metric of remote node.
*/
public class ControlMetricsRequest {
private final ControlMetricType type;
private Optional<DeviceId> deviceId;
private String resourceName;
/**
* Instantiates a new control metric request with the given control metric
* type and device identifier.
*
* @param type control metric type
* @param deviceId device identifier
*/
public ControlMetricsRequest(ControlMetricType type, Optional<DeviceId> deviceId) {
this.type = type;
this.deviceId = deviceId;
}
/**
* Instantiates a new control metric request with the given control metric
* type and resource name.
*
* @param type control metric type
* @param resourceName resource name
*/
public ControlMetricsRequest(ControlMetricType type, String resourceName) {
this.type = type;
this.resourceName = resourceName;
}
/**
* Obtains control metric type.
*
* @return control metric type
*/
public ControlMetricType getType() {
return type;
}
/**
* Obtains resource name.
*
* @return resource name
*/
public String getResourceName() {
return resourceName;
}
/**
* Obtains device identifier.
*
* @return device identifier
*/
public Optional<DeviceId> getDeviceId() {
return deviceId;
}
@Override
public int hashCode() {
return Objects.hash(type, deviceId, resourceName);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof ControlMetricsRequest) {
final ControlMetricsRequest other = (ControlMetricsRequest) obj;
return Objects.equals(this.type, other.type) &&
Objects.equals(this.deviceId, other.deviceId) &&
Objects.equals(this.resourceName, other.resourceName);
}
return false;
}
@Override
public String toString() {
MoreObjects.ToStringHelper helper;
helper = toStringHelper(this)
.add("type", type)
.add("resourceName", resourceName);
if (deviceId != null) {
helper.add("deviceId", deviceId.get());
}
return helper.toString();
}
}
......@@ -24,8 +24,8 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.cpman.ControlLoad;
import org.onosproject.cpman.ControlMetric;
......@@ -33,15 +33,29 @@ import org.onosproject.cpman.ControlMetricType;
import org.onosproject.cpman.ControlPlaneMonitorService;
import org.onosproject.cpman.MetricsDatabase;
import org.onosproject.net.DeviceId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.onosproject.cpman.ControlResource.*;
import static com.google.common.base.Preconditions.checkArgument;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
import static org.onosproject.cpman.ControlResource.CPU_METRICS;
import static org.onosproject.cpman.ControlResource.DISK_METRICS;
import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
import static org.onosproject.cpman.ControlResource.Type;
/**
* Control plane monitoring service class.
......@@ -60,9 +74,15 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService communicationService;
private static final Set RESOURCE_TYPE_SET =
ImmutableSet.of(Type.CONTROL_MESSAGE, Type.DISK, Type.NETWORK);
private static final MessageSubject CONTROL_STATS =
new MessageSubject("control-plane-stats");
private Map<ControlMetricType, Double> cpuBuf;
private Map<ControlMetricType, Double> memoryBuf;
private Map<String, Map<ControlMetricType, Double>> diskBuf;
......@@ -72,6 +92,19 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
private Map<Type, Set<String>> availableResourceMap;
private Set<DeviceId> availableDeviceIdSet;
private ExecutorService messageHandlingExecutor;
private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
private static final Serializer SERIALIZER = Serializer
.using(new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(ControlMetricsRequest.class)
.register(DefaultControlLoad.class)
.register(DefaultMetricsDatabase.class)
.register(ControlMetricType.class)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
@Activate
public void activate() {
cpuMetrics = genMDbBuilder(Type.CPU, CPU_METRICS);
......@@ -89,6 +122,12 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
availableResourceMap = Maps.newConcurrentMap();
availableDeviceIdSet = Sets.newConcurrentHashSet();
messageHandlingExecutor = Executors.newSingleThreadScheduledExecutor(
groupedThreads("onos/app/cpman", "message-handlers"));
communicationService.addSubscriber(CONTROL_STATS,
SERIALIZER::decode, this::handleRequest, messageHandlingExecutor);
log.info("Started");
}
......@@ -102,6 +141,8 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
networkBuf.clear();
ctrlMsgBuf.clear();
communicationService.removeSubscriber(CONTROL_STATS);
log.info("Stopped");
}
......@@ -197,55 +238,59 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
}
@Override
public ControlLoad getLoad(NodeId nodeId, ControlMetricType type,
Optional<DeviceId> deviceId) {
ControllerNode node = clusterService.getNode(nodeId);
if (clusterService.getLocalNode().equals(node)) {
if (deviceId.isPresent()) {
if (CONTROL_MESSAGE_METRICS.contains(type) &&
availableDeviceIdSet.contains(deviceId.get())) {
return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
}
} else {
// returns controlLoad of CPU metrics
if (CPU_METRICS.contains(type)) {
return new DefaultControlLoad(cpuMetrics, type);
}
// returns memoryLoad of memory metrics
if (MEMORY_METRICS.contains(type)) {
return new DefaultControlLoad(memoryMetrics, type);
}
public ControlLoad getLocalLoad(ControlMetricType type,
Optional<DeviceId> deviceId) {
if (deviceId.isPresent()) {
if (CONTROL_MESSAGE_METRICS.contains(type) &&
availableDeviceIdSet.contains(deviceId.get())) {
return new DefaultControlLoad(controlMessageMap.get(deviceId.get()), type);
}
} else {
// TODO: currently only query the metrics of local node
return null;
// returns controlLoad of CPU metrics
if (CPU_METRICS.contains(type)) {
return new DefaultControlLoad(cpuMetrics, type);
}
// returns memoryLoad of memory metrics
if (MEMORY_METRICS.contains(type)) {
return new DefaultControlLoad(memoryMetrics, type);
}
}
return null;
}
@Override
public ControlLoad getLoad(NodeId nodeId, ControlMetricType type,
String resourceName) {
if (clusterService.getLocalNode().id().equals(nodeId)) {
if (DISK_METRICS.contains(type) &&
availableResources(Type.DISK).contains(resourceName)) {
return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
}
public ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
if (DISK_METRICS.contains(type) &&
availableResources(Type.DISK).contains(resourceName)) {
return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
}
if (NETWORK_METRICS.contains(type) &&
availableResources(Type.NETWORK).contains(resourceName)) {
return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
}
} else {
// TODO: currently only query the metrics of local node
return null;
if (NETWORK_METRICS.contains(type) &&
availableResources(Type.NETWORK).contains(resourceName)) {
return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
}
return null;
}
@Override
public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
ControlMetricType type,
Optional<DeviceId> deviceId) {
return communicationService.sendAndReceive(createRequest(type, deviceId),
CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
@Override
public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
ControlMetricType type,
String resourceName) {
return communicationService.sendAndReceive(createRequest(type, resourceName),
CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
@Override
public Set<String> availableResources(Type resourceType) {
if (RESOURCE_TYPE_SET.contains(resourceType)) {
if (Type.CONTROL_MESSAGE.equals(resourceType)) {
......@@ -291,4 +336,27 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
map.forEach((k, v) -> newMap.putIfAbsent(k.toString(), v));
return newMap;
}
}
\ No newline at end of file
private CompletableFuture<ControlLoad> handleRequest(ControlMetricsRequest request) {
checkArgument(request.getType() != null, METRIC_TYPE_NULL);
ControlLoad load;
if (request.getResourceName() != null) {
load = getLocalLoad(request.getType(), request.getResourceName());
} else {
load = getLocalLoad(request.getType(), request.getDeviceId());
}
return CompletableFuture.completedFuture(load);
}
private ControlMetricsRequest createRequest(ControlMetricType type,
Optional<DeviceId> deviceId) {
return new ControlMetricsRequest(type, deviceId);
}
private ControlMetricsRequest createRequest(ControlMetricType type,
String resourceName) {
return new ControlMetricsRequest(type, resourceName);
}
}
......
......@@ -241,17 +241,17 @@ public class ControlMetricsWebResource extends AbstractWebResource {
if (name == null && did == null) {
typeSet.forEach(type -> {
ObjectNode metricNode = mapper().createObjectNode();
ControlLoad load = service.getLoad(nodeId, type, Optional.ofNullable(null));
ControlLoad load = service.getLocalLoad(type, Optional.ofNullable(null));
if (load != null) {
metricNode.set(type.toString().toLowerCase(), codec(ControlLoad.class)
.encode(service.getLoad(nodeId, type, Optional.ofNullable(null)), this));
.encode(service.getLocalLoad(type, Optional.ofNullable(null)), this));
metricsNode.add(metricNode);
}
});
} else if (name == null) {
typeSet.forEach(type -> {
ObjectNode metricNode = mapper().createObjectNode();
ControlLoad load = service.getLoad(nodeId, type, Optional.of(did));
ControlLoad load = service.getLocalLoad(type, Optional.of(did));
if (load != null) {
metricNode.set(type.toString().toLowerCase(),
codec(ControlLoad.class).encode(load, this));
......@@ -261,7 +261,7 @@ public class ControlMetricsWebResource extends AbstractWebResource {
} else if (did == null) {
typeSet.forEach(type -> {
ObjectNode metricNode = mapper().createObjectNode();
ControlLoad load = service.getLoad(nodeId, type, name);
ControlLoad load = service.getLocalLoad(type, name);
if (load != null) {
metricNode.set(type.toString().toLowerCase(),
codec(ControlLoad.class).encode(load, this));
......
......@@ -26,6 +26,8 @@ import org.onosproject.cpman.ControlMetric;
import org.onosproject.cpman.ControlMetricType;
import org.onosproject.cpman.MetricValue;
import org.onosproject.net.DeviceId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import java.util.Optional;
import java.util.Set;
......@@ -36,8 +38,12 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.onosproject.cpman.ControlResource.*;
import static org.onosproject.cpman.ControlResource.CONTROL_MESSAGE_METRICS;
import static org.onosproject.cpman.ControlResource.CPU_METRICS;
import static org.onosproject.cpman.ControlResource.DISK_METRICS;
import static org.onosproject.cpman.ControlResource.MEMORY_METRICS;
import static org.onosproject.cpman.ControlResource.NETWORK_METRICS;
import static org.onosproject.cpman.ControlResource.Type;
/**
* Unit test of control plane monitoring service.
......@@ -48,6 +54,7 @@ public class ControlPlaneMonitorTest {
private static final Integer UPDATE_INTERVAL = 1;
private ClusterService mockClusterService;
private ControllerNode mockControllerNode;
private ClusterCommunicationService mockCommunicationService;
private NodeId nodeId;
/**
......@@ -56,7 +63,9 @@ public class ControlPlaneMonitorTest {
@Before
public void setup() {
monitor = new ControlPlaneMonitor();
monitor.activate();
mockCommunicationService = new ClusterCommunicationServiceAdapter();
monitor.communicationService = mockCommunicationService;
nodeId = new NodeId("1");
mockControllerNode = new MockControllerNode(nodeId);
......@@ -68,6 +77,8 @@ public class ControlPlaneMonitorTest {
expect(mockClusterService.getLocalNode())
.andReturn(mockControllerNode).anyTimes();
replay(mockClusterService);
monitor.activate();
}
/**
......@@ -102,7 +113,7 @@ public class ControlPlaneMonitorTest {
}
private void testLoadMetricWithoutId(ControlMetricType cmt, MetricValue mv) {
assertThat(monitor.getLoad(nodeId, cmt, Optional.ofNullable(null)).latest(), is(mv.getLoad()));
assertThat(monitor.getLocalLoad(cmt, Optional.ofNullable(null)).latest(), is(mv.getLoad()));
}
private void testUpdateMetricWithResource(ControlMetricType cmt, MetricValue mv, String resourceName) {
......@@ -111,7 +122,7 @@ public class ControlPlaneMonitorTest {
}
private void testLoadMetricWithResource(ControlMetricType cmt, MetricValue mv, String resourceName) {
assertThat(monitor.getLoad(nodeId, cmt, resourceName).latest(), is(mv.getLoad()));
assertThat(monitor.getLocalLoad(cmt, resourceName).latest(), is(mv.getLoad()));
}
private void testUpdateMetricWithId(ControlMetricType cmt, MetricValue mv, DeviceId did) {
......@@ -120,7 +131,7 @@ public class ControlPlaneMonitorTest {
}
private void testLoadMetricWithId(ControlMetricType cmt, MetricValue mv, DeviceId did) {
assertThat(monitor.getLoad(nodeId, cmt, Optional.of(did)).latest(), is(mv.getLoad()));
assertThat(monitor.getLocalLoad(cmt, Optional.of(did)).latest(), is(mv.getLoad()));
}
/**
......
......@@ -25,6 +25,7 @@ import org.onosproject.net.DeviceId;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
* Test adapter control plane monitoring service.
......@@ -32,23 +33,38 @@ import java.util.Set;
public class ControlPlaneMonitorServiceAdaptor implements ControlPlaneMonitorService {
@Override
public void updateMetric(ControlMetric controlMetric,
int updateIntervalInMinutes, Optional<DeviceId> deviceId) {
int updateIntervalInMinutes,
Optional<DeviceId> deviceId) {
}
@Override
public void updateMetric(ControlMetric controlMetric,
int updateIntervalInMinutes, String resourceName) {
int updateIntervalInMinutes,
String resourceName) {
}
@Override
public ControlLoad getLoad(NodeId nodeId,
ControlMetricType type, Optional<DeviceId> deviceId) {
public ControlLoad getLocalLoad(ControlMetricType type,
Optional<DeviceId> deviceId) {
return null;
}
@Override
public ControlLoad getLoad(NodeId nodeId,
ControlMetricType type, String resourceName) {
public ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
return null;
}
@Override
public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
ControlMetricType type,
Optional<DeviceId> deviceId) {
return null;
}
@Override
public CompletableFuture<ControlLoad> getRemoteLoad(NodeId nodeId,
ControlMetricType type,
String resourceName) {
return null;
}
......
......@@ -182,8 +182,8 @@ public class ControlMetricsResourceTest extends JerseyTest {
public void testResourcePopulatedArray() {
expect(mockControlPlaneMonitorService.availableResources(anyObject()))
.andReturn(resourceSet).once();
expect(mockControlPlaneMonitorService.getLoad(anyObject(), anyObject(),
anyString())).andReturn(null).times(4);
expect(mockControlPlaneMonitorService.getLocalLoad(anyObject(),
anyString())).andReturn(null).times(4);
replay(mockControlPlaneMonitorService);
final WebTarget wt = target();
......