Jian Li
Committed by Gerrit Code Review

Support to query available resources from remote node

Change-Id: I465327143b5959b9e18daac9481ffea332f889c8
......@@ -15,6 +15,8 @@
*/
package org.onosproject.cpman;
import com.google.common.collect.ImmutableSet;
import org.onlab.util.Tools;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
......@@ -30,6 +32,8 @@ import static org.onosproject.cpman.ControlResource.Type;
*/
public interface ControlPlaneMonitorService {
long TIMEOUT_MILLIS = 2000;
/**
* Adds a new control metric value with a certain update interval.
*
......@@ -116,8 +120,22 @@ public interface ControlPlaneMonitorService {
/**
* Obtains a list of names of available resources.
*
* @param nodeId node identifier
* @param resourceType resource type
* @return completable future object of a collection of available resource names
*/
CompletableFuture<Set<String>> availableResources(NodeId nodeId, Type resourceType);
/**
* Synchronous version of availableResource.
* Obtains a list of names of available resources.
*
* @param nodeId node identifier
* @param resourceType resource type
* @return a collection of names of available resources
* @return a collection of available resource names
*/
Set<String> availableResources(Type resourceType);
default Set<String> availableResourcesSync(NodeId nodeId, Type resourceType) {
return Tools.futureGetOrElse(availableResources(nodeId, resourceType),
TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, ImmutableSet.of());
}
}
\ No newline at end of file
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cpman;
import com.google.common.base.MoreObjects;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
/**
* A container class that is used to request available resource of remote node.
*/
public class ControlResourceRequest {
private final ControlResource.Type type;
/**
* Instantiates a new control resource request of the control resource type.
*
* @param type control resource type
*/
public ControlResourceRequest(ControlResource.Type type) {
this.type = type;
}
/**
* Obtains control resource type.
*
* @return control resource type
*/
public ControlResource.Type getType() {
return type;
}
@Override
public int hashCode() {
return Objects.hash(type);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof ControlResourceRequest) {
final ControlResourceRequest other = (ControlResourceRequest) obj;
return Objects.equals(this.type, other.type);
}
return false;
}
@Override
public String toString() {
MoreObjects.ToStringHelper helper;
helper = toStringHelper(this)
.add("type", type);
return helper.toString();
}
}
......@@ -21,6 +21,7 @@ import org.apache.karaf.shell.console.completer.ArgumentCompleter;
import org.apache.karaf.shell.console.completer.StringsCompleter;
import org.onosproject.cli.AbstractCompleter;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.cluster.NodeId;
import org.onosproject.cpman.ControlPlaneMonitorService;
import org.onosproject.cpman.ControlResource;
import org.slf4j.Logger;
......@@ -40,7 +41,7 @@ public class ResourceNameCompleter extends AbstractCompleter {
private static final String NETWORK = "network";
private static final String DISK = "disk";
private static final String CONTROL_MESSAGE = "control_message";
Set<String> resourceTypes = ImmutableSet.of(NETWORK, DISK, CONTROL_MESSAGE);
private final Set<String> resourceTypes = ImmutableSet.of(NETWORK, DISK, CONTROL_MESSAGE);
private static final String INVALID_MSG = "Invalid type name";
......@@ -51,7 +52,8 @@ public class ResourceNameCompleter extends AbstractCompleter {
// Resource type is the second argument.
ArgumentCompleter.ArgumentList list = getArgumentList();
String type = list.getArguments()[1];
String nodeId = list.getArguments()[1];
String type = list.getArguments()[2];
if (resourceTypes.contains(type)) {
ControlPlaneMonitorService monitorService =
......@@ -60,13 +62,16 @@ public class ResourceNameCompleter extends AbstractCompleter {
Set<String> set = Sets.newHashSet();
switch (type) {
case NETWORK:
set = monitorService.availableResources(ControlResource.Type.NETWORK);
set = monitorService.availableResourcesSync(NodeId.nodeId(nodeId),
ControlResource.Type.NETWORK);
break;
case DISK:
set = monitorService.availableResources(ControlResource.Type.DISK);
set = monitorService.availableResourcesSync(NodeId.nodeId(nodeId),
ControlResource.Type.DISK);
break;
case CONTROL_MESSAGE:
set = monitorService.availableResources(ControlResource.Type.CONTROL_MESSAGE);
set = monitorService.availableResourcesSync(NodeId.nodeId(nodeId),
ControlResource.Type.CONTROL_MESSAGE);
break;
default:
log.warn(INVALID_MSG);
......@@ -76,7 +81,7 @@ public class ResourceNameCompleter extends AbstractCompleter {
SortedSet<String> strings = delegate.getStrings();
if (set.size() != 0) {
set.forEach(s -> strings.add(s));
set.forEach(strings::add);
}
}
......
......@@ -25,6 +25,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.LocalDateTime;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.cpman.ControlLoadSnapshot;
import org.onosproject.cpman.ControlMetricType;
import org.onosproject.cpman.ControlPlaneMonitorService;
......@@ -96,10 +97,11 @@ public class CpmanViewMessageHandler extends UiMessageHandler {
ControlPlaneMonitorService cpms = get(ControlPlaneMonitorService.class);
ClusterService cs = get(ClusterService.class);
DeviceService ds = get(DeviceService.class);
NodeId localNodeId = cs.getLocalNode().id();
if (!Strings.isNullOrEmpty(uri)) {
DeviceId deviceId = DeviceId.deviceId(uri);
if (cpms.availableResources(CONTROL_MESSAGE).contains(deviceId.toString())) {
if (cpms.availableResourcesSync(localNodeId, CONTROL_MESSAGE).contains(deviceId.toString())) {
Map<ControlMetricType, Long[]> data = generateMatrix(cpms, cs, deviceId);
LocalDateTime ldt = new LocalDateTime(timestamp * MILLI_CONV_UNIT);
......@@ -110,7 +112,7 @@ public class CpmanViewMessageHandler extends UiMessageHandler {
attachDeviceList(cm, deviceIds);
}
} else {
Set<String> deviceIds = cpms.availableResources(CONTROL_MESSAGE);
Set<String> deviceIds = cpms.availableResourcesSync(localNodeId, CONTROL_MESSAGE);
for (String deviceId : deviceIds) {
Map<ControlMetricType, Long> data =
populateDeviceMetrics(cpms, cs, DeviceId.deviceId(deviceId));
......
......@@ -34,6 +34,8 @@ import org.onosproject.cpman.ControlMetric;
import org.onosproject.cpman.ControlMetricType;
import org.onosproject.cpman.ControlMetricsRequest;
import org.onosproject.cpman.ControlPlaneMonitorService;
import org.onosproject.cpman.ControlResource;
import org.onosproject.cpman.ControlResourceRequest;
import org.onosproject.cpman.MetricsDatabase;
import org.onosproject.net.DeviceId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
......@@ -86,6 +88,9 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
private static final MessageSubject CONTROL_STATS =
new MessageSubject("control-plane-stats");
private static final MessageSubject CONTROL_RESOURCE =
new MessageSubject("control-plane-resources");
private Map<ControlMetricType, Double> cpuBuf;
private Map<ControlMetricType, Double> memoryBuf;
private Map<String, Map<ControlMetricType, Double>> diskBuf;
......@@ -96,14 +101,17 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
private Set<DeviceId> availableDeviceIdSet;
private static final String METRIC_TYPE_NULL = "Control metric type cannot be null";
private static final String RESOURCE_TYPE_NULL = "Control resource type cannot be null";
private static final Serializer SERIALIZER = Serializer
.using(new KryoNamespace.Builder()
.register(KryoNamespaces.API)
.register(KryoNamespaces.BASIC)
.register(ControlMetricsRequest.class)
.register(ControlResourceRequest.class)
.register(ControlLoadSnapshot.class)
.register(ControlMetricType.class)
.register(ControlResource.Type.class)
.register(TimeUnit.class)
.nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID).build());
......@@ -125,7 +133,10 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
availableDeviceIdSet = Sets.newConcurrentHashSet();
communicationService.<ControlMetricsRequest, ControlLoadSnapshot>addSubscriber(CONTROL_STATS,
SERIALIZER::decode, this::handleRequest, SERIALIZER::encode);
SERIALIZER::decode, this::handleMetricsRequest, SERIALIZER::encode);
communicationService.<ControlResourceRequest, Set<String>>addSubscriber(CONTROL_RESOURCE,
SERIALIZER::decode, this::handleResourceRequest, SERIALIZER::encode);
log.info("Started");
}
......@@ -141,6 +152,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
ctrlMsgBuf.clear();
communicationService.removeSubscriber(CONTROL_STATS);
communicationService.removeSubscriber(CONTROL_RESOURCE);
log.info("Stopped");
}
......@@ -243,7 +255,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
if (clusterService.getLocalNode().id().equals(nodeId)) {
return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId)));
} else {
return communicationService.sendAndReceive(createRequest(type, deviceId),
return communicationService.sendAndReceive(createMetricsRequest(type, deviceId),
CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
}
......@@ -255,7 +267,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
if (clusterService.getLocalNode().id().equals(nodeId)) {
return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName)));
} else {
return communicationService.sendAndReceive(createRequest(type, resourceName),
return communicationService.sendAndReceive(createMetricsRequest(type, resourceName),
CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
}
......@@ -268,7 +280,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
if (clusterService.getLocalNode().id().equals(nodeId)) {
return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, deviceId), duration, unit));
} else {
return communicationService.sendAndReceive(createRequest(type, duration, unit, deviceId),
return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, deviceId),
CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
}
......@@ -281,24 +293,22 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
if (clusterService.getLocalNode().id().equals(nodeId)) {
return CompletableFuture.completedFuture(snapshot(getLocalLoad(type, resourceName), duration, unit));
} else {
return communicationService.sendAndReceive(createRequest(type, duration, unit, resourceName),
return communicationService.sendAndReceive(createMetricsRequest(type, duration, unit, resourceName),
CONTROL_STATS, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
}
@Override
public Set<String> availableResources(Type resourceType) {
if (RESOURCE_TYPE_SET.contains(resourceType)) {
if (Type.CONTROL_MESSAGE.equals(resourceType)) {
return availableDeviceIdSet.stream().map(id ->
id.toString()).collect(Collectors.toSet());
public CompletableFuture<Set<String>> availableResources(NodeId nodeId,
Type resourceType) {
if (clusterService.getLocalNode().id().equals(nodeId)) {
Set<String> resources = getLocalAvailableResources(resourceType);
return CompletableFuture.completedFuture(resources);
} else {
Set<String> res = availableResourceMap.get(resourceType);
return res == null ? ImmutableSet.of() : res;
return communicationService.sendAndReceive(createResourceRequest(resourceType),
CONTROL_RESOURCE, SERIALIZER::encode, SERIALIZER::decode, nodeId);
}
}
return ImmutableSet.of();
}
/**
* Builds and returns metric database instance with given resource name,
......@@ -385,7 +395,8 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
* @param request control metric request
* @return completable future object of control load snapshot
*/
private CompletableFuture<ControlLoadSnapshot> handleRequest(ControlMetricsRequest request) {
private CompletableFuture<ControlLoadSnapshot>
handleMetricsRequest(ControlMetricsRequest request) {
checkArgument(request.getType() != null, METRIC_TYPE_NULL);
......@@ -408,13 +419,28 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
}
/**
* Handles control resource request from remote node.
*
* @param request control resource type
* @return completable future object of control resource set
*/
private CompletableFuture<Set<String>>
handleResourceRequest(ControlResourceRequest request) {
checkArgument(request.getType() != null, RESOURCE_TYPE_NULL);
Set<String> resources = getLocalAvailableResources(request.getType());
return CompletableFuture.completedFuture(resources);
}
/**
* Generates a control metric request.
*
* @param type control metric type
* @param deviceId device identifier
* @return control metric request instance
*/
private ControlMetricsRequest createRequest(ControlMetricType type,
private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
Optional<DeviceId> deviceId) {
return new ControlMetricsRequest(type, deviceId);
}
......@@ -428,7 +454,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
* @param deviceId device identifier
* @return control metric request instance
*/
private ControlMetricsRequest createRequest(ControlMetricType type,
private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
int duration, TimeUnit unit,
Optional<DeviceId> deviceId) {
return new ControlMetricsRequest(type, duration, unit, deviceId);
......@@ -441,7 +467,7 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
* @param resourceName resource name
* @return control metric request instance
*/
private ControlMetricsRequest createRequest(ControlMetricType type,
private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
String resourceName) {
return new ControlMetricsRequest(type, resourceName);
}
......@@ -455,13 +481,23 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
* @param resourceName resource name
* @return control metric request instance
*/
private ControlMetricsRequest createRequest(ControlMetricType type,
private ControlMetricsRequest createMetricsRequest(ControlMetricType type,
int duration, TimeUnit unit,
String resourceName) {
return new ControlMetricsRequest(type, duration, unit, resourceName);
}
/**
* Generates a control resource request with given resource type.
*
* @param type control resource type
* @return control resource request instance
*/
private ControlResourceRequest createResourceRequest(ControlResource.Type type) {
return new ControlResourceRequest(type);
}
/**
* Returns a snapshot of control load.
*
* @param cl control load
......@@ -528,17 +564,39 @@ public class ControlPlaneMonitor implements ControlPlaneMonitorService {
* @return control load
*/
private ControlLoad getLocalLoad(ControlMetricType type, String resourceName) {
NodeId localNodeId = clusterService.getLocalNode().id();
// returns disk I/O stats
if (DISK_METRICS.contains(type) &&
availableResources(Type.DISK).contains(resourceName)) {
availableResourcesSync(localNodeId, Type.DISK).contains(resourceName)) {
return new DefaultControlLoad(diskMetricsMap.get(resourceName), type);
}
// returns network I/O stats
if (NETWORK_METRICS.contains(type) &&
availableResources(Type.NETWORK).contains(resourceName)) {
availableResourcesSync(localNodeId, Type.NETWORK).contains(resourceName)) {
return new DefaultControlLoad(networkMetricsMap.get(resourceName), type);
}
return null;
}
/**
* Obtains the available resource list from local node.
*
* @param resourceType control resource type
* @return a set of available control resource
*/
private Set<String> getLocalAvailableResources(Type resourceType) {
Set<String> resources = ImmutableSet.of();
if (RESOURCE_TYPE_SET.contains(resourceType)) {
if (Type.CONTROL_MESSAGE.equals(resourceType)) {
resources = ImmutableSet.copyOf(availableDeviceIdSet.stream()
.map(DeviceId::toString).collect(Collectors.toSet()));
} else {
Set<String> res = availableResourceMap.get(resourceType);
resources = res == null ? ImmutableSet.of() : res;
}
}
return resources;
}
}
......
......@@ -72,7 +72,7 @@ public class ControlMetricsWebResource extends AbstractWebResource {
public Response controlMessageMetrics() {
ArrayNode deviceNodes = root.putArray("devices");
monitorService.availableResources(CONTROL_MESSAGE).forEach(name -> {
monitorService.availableResourcesSync(localNodeId, CONTROL_MESSAGE).forEach(name -> {
ObjectNode deviceNode = mapper().createObjectNode();
ObjectNode valueNode = mapper().createObjectNode();
......@@ -147,7 +147,7 @@ public class ControlMetricsWebResource extends AbstractWebResource {
public Response diskMetrics() {
ArrayNode diskNodes = root.putArray("disks");
monitorService.availableResources(DISK).forEach(name -> {
monitorService.availableResourcesSync(localNodeId, DISK).forEach(name -> {
ObjectNode diskNode = mapper().createObjectNode();
ObjectNode valueNode = mapper().createObjectNode();
......@@ -173,7 +173,7 @@ public class ControlMetricsWebResource extends AbstractWebResource {
public Response networkMetrics() {
ArrayNode networkNodes = root.putArray("networks");
monitorService.availableResources(NETWORK).forEach(name -> {
monitorService.availableResourcesSync(localNodeId, NETWORK).forEach(name -> {
ObjectNode networkNode = mapper().createObjectNode();
ObjectNode valueNode = mapper().createObjectNode();
......
......@@ -240,7 +240,7 @@ public class ControlPlaneMonitorTest {
networkSet.forEach(network -> NETWORK_METRICS.forEach(cmt ->
testUpdateMetricWithResource(cmt, mv, network)));
assertThat(monitor.availableResources(Type.DISK), is(diskSet));
assertThat(monitor.availableResources(Type.NETWORK), is(networkSet));
assertThat(monitor.availableResourcesSync(nodeId, Type.DISK), is(diskSet));
assertThat(monitor.availableResourcesSync(nodeId, Type.NETWORK), is(networkSet));
}
}
......
......@@ -75,7 +75,8 @@ public class ControlPlaneMonitorServiceAdaptor implements ControlPlaneMonitorSer
}
@Override
public Set<String> availableResources(ControlResource.Type resourceType) {
public CompletableFuture<Set<String>> availableResources(NodeId nodeId,
ControlResource.Type resourceType) {
return null;
}
}
......
......@@ -166,7 +166,7 @@ public class ControlMetricsResourceTest extends ResourceTest {
*/
@Test
public void testResourceEmptyArray() {
expect(mockControlPlaneMonitorService.availableResources(anyObject()))
expect(mockControlPlaneMonitorService.availableResourcesSync(anyObject(), anyObject()))
.andReturn(ImmutableSet.of()).once();
replay(mockControlPlaneMonitorService);
final WebTarget wt = target();
......@@ -181,7 +181,7 @@ public class ControlMetricsResourceTest extends ResourceTest {
*/
@Test
public void testResourcePopulatedArray() {
expect(mockControlPlaneMonitorService.availableResources(anyObject()))
expect(mockControlPlaneMonitorService.availableResourcesSync(anyObject(), anyObject()))
.andReturn(resourceSet).once();
expect(mockControlPlaneMonitorService.getLoad(anyObject(), anyObject(),
anyString())).andReturn(null).times(4);
......