HIGUCHI Yuta
Committed by Gerrit Code Review

[Falcon] ONOS-3372 Link related Service on gRPC

Change-Id: Ib497d17cb3c0126086a1ce03a6f99ae344320448
......@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.onosproject.incubator.rpc.RemoteServiceContext;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.link.LinkProviderRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,6 +50,7 @@ public class GrpcRemoteServiceContext implements RemoteServiceContext {
public GrpcRemoteServiceContext(ManagedChannel channel) {
this.channel = checkNotNull(channel);
services.put(DeviceProviderRegistry.class, new DeviceProviderRegistryClientProxy(channel));
services.put(LinkProviderRegistry.class, new LinkProviderRegistryClientProxy(channel));
}
......
......@@ -21,6 +21,7 @@ import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.translate;
import static org.onosproject.net.DeviceId.deviceId;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
......@@ -47,11 +48,15 @@ import org.onosproject.grpc.Device.UpdatePortStatistics;
import org.onosproject.grpc.Device.UpdatePorts;
import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc;
import org.onosproject.grpc.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
import org.onosproject.grpc.LinkProviderServiceRpcGrpc;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.provider.ProviderId;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
......@@ -59,6 +64,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.grpc.Server;
......@@ -68,13 +74,15 @@ import io.grpc.stub.StreamObserver;
// gRPC Server on Metro-side
// Translates request received on RPC channel, and calls corresponding Service on
// Metro-ONOS cluster.
// Currently supports DeviceProviderRegistry, LinkProviderService
/**
* Server side implementation of gRPC based RemoteService.
*/
@Component(immediate = true)
public class GrpcRemoteServiceServer {
private static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
static final String RPC_PROVIDER_NAME = "org.onosproject.rpc.provider.grpc";
// TODO pick a number
public static final int DEFAULT_LISTEN_PORT = 11984;
......@@ -84,6 +92,9 @@ public class GrpcRemoteServiceServer {
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceProviderRegistry deviceProviderRegistry;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LinkProviderRegistry linkProviderRegistry;
@Property(name = "listenPort", intValue = DEFAULT_LISTEN_PORT,
label = "Port to listen on")
......@@ -92,6 +103,11 @@ public class GrpcRemoteServiceServer {
private Server server;
private final Set<DeviceProviderServerProxy> registeredProviders = Sets.newConcurrentHashSet();
// scheme -> ...
// updates must be guarded by synchronizing `this`
private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
@Activate
protected void activate(ComponentContext context) throws IOException {
modified(context);
......@@ -100,6 +116,7 @@ public class GrpcRemoteServiceServer {
try {
server = NettyServerBuilder.forPort(listenPort)
.addService(DeviceProviderRegistryRpcGrpc.bindService(new DeviceProviderRegistryServerProxy()))
.addService(LinkProviderServiceRpcGrpc.bindService(new LinkProviderServiceServerProxy(this)))
.build().start();
} catch (IOException e) {
log.error("Failed to start gRPC server", e);
......@@ -117,6 +134,9 @@ public class GrpcRemoteServiceServer {
server.shutdown();
// Should we wait for shutdown?
unregisterLinkProviders();
log.info("Stopped");
}
......@@ -125,6 +145,40 @@ public class GrpcRemoteServiceServer {
// TODO support dynamic reconfiguration and restarting server?
}
/**
* Registers {@link StubLinkProvider} for given ProviderId scheme.
*
* DO NOT DIRECTLY CALL THIS METHOD.
* Only expected to be called from {@link #getLinkProviderServiceFor(String)}.
*
* @param scheme ProviderId scheme.
* @return {@link LinkProviderService} registered.
*/
private synchronized LinkProviderService registerStubLinkProvider(String scheme) {
StubLinkProvider provider = new StubLinkProvider(scheme);
linkProviders.put(scheme, provider);
return linkProviderRegistry.register(provider);
}
/**
* Unregisters all registered LinkProviders.
*/
private synchronized void unregisterLinkProviders() {
linkProviders.values().forEach(linkProviderRegistry::unregister);
linkProviders.clear();
linkProviderServices.clear();
}
/**
* Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
*
* @param scheme ProviderId scheme.
* @return {@link LinkProviderService}
*/
protected LinkProviderService getLinkProviderServiceFor(String scheme) {
return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
}
// RPC Server-side code
// RPC session Factory
/**
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkNotNull;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.provider.AbstractProviderRegistry;
import com.google.api.client.repackaged.com.google.common.annotations.Beta;
import io.grpc.Channel;
/**
* Proxy object to handle LinkProviderRegistry calls.
*/
@Beta
public class LinkProviderRegistryClientProxy
extends AbstractProviderRegistry<LinkProvider, LinkProviderService>
implements LinkProviderRegistry {
private final Channel channel;
public LinkProviderRegistryClientProxy(Channel channel) {
this.channel = checkNotNull(channel);
}
@Override
protected LinkProviderService createProviderService(LinkProvider provider) {
return new LinkProviderServiceClientProxy(provider, channel);
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static org.onosproject.incubator.rpc.grpc.GrpcDeviceUtils.asMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.onosproject.grpc.Link.LinkDetectedMsg;
import org.onosproject.grpc.Link.LinkType;
import org.onosproject.grpc.Link.LinkVanishedMsg;
import org.onosproject.grpc.Link.Void;
import org.onosproject.grpc.LinkProviderServiceRpcGrpc;
import org.onosproject.grpc.LinkProviderServiceRpcGrpc.LinkProviderServiceRpcFutureStub;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link.Type;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.provider.AbstractProviderService;
import org.onosproject.net.provider.ProviderId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.client.repackaged.com.google.common.annotations.Beta;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Channel;
/**
* Proxy object to handle LinkProviderService calls.
*
* RPC wise, this will initiate a RPC call on each method invocation.
*/
@Beta
class LinkProviderServiceClientProxy
extends AbstractProviderService<LinkProvider>
implements LinkProviderService {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Channel channel;
/**
* Constructs {@link LinkProviderServiceClientProxy}.
*
* @param provider {@link LinkProvider}. Only ProviderId scheme is used.
* @param channel channel to use to call RPC
*/
protected LinkProviderServiceClientProxy(LinkProvider provider, Channel channel) {
super(provider);
this.channel = channel;
}
@Override
public void linkDetected(LinkDescription linkDescription) {
checkValidity();
LinkProviderServiceRpcFutureStub newStub = LinkProviderServiceRpcGrpc.newFutureStub(channel);
ListenableFuture<Void> future = newStub.linkDetected(detectMsg(provider().id(), linkDescription));
try {
// There's no need to wait, but just checking server
future.get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("linkDetected({}) failed", linkDescription, e);
invalidate();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("linkDetected({}) failed", linkDescription, e);
invalidate();
} catch (TimeoutException e) {
log.error("linkDetected({}) failed", linkDescription, e);
invalidate();
}
}
@Override
public void linkVanished(LinkDescription linkDescription) {
checkValidity();
LinkProviderServiceRpcFutureStub newStub = LinkProviderServiceRpcGrpc.newFutureStub(channel);
ListenableFuture<Void> future = newStub.linkVanished(vanishMsg(provider().id(), linkDescription));
try {
// There's no need to wait, but just checking server
future.get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("linkVanished({}) failed", linkDescription, e);
invalidate();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("linkVanished({}) failed", linkDescription, e);
invalidate();
} catch (TimeoutException e) {
log.error("linkVanished({}) failed", linkDescription, e);
invalidate();
}
}
@Override
public void linksVanished(ConnectPoint connectPoint) {
checkValidity();
LinkProviderServiceRpcFutureStub newStub = LinkProviderServiceRpcGrpc.newFutureStub(channel);
ListenableFuture<Void> future = newStub.linkVanished(vanishMsg(provider().id(), connectPoint));
try {
// There's no need to wait, but just checking server
future.get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("linksVanished({}) failed", connectPoint, e);
invalidate();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("linksVanished({}) failed", connectPoint, e);
invalidate();
} catch (TimeoutException e) {
log.error("linksVanished({}) failed", connectPoint, e);
invalidate();
}
}
@Override
public void linksVanished(DeviceId deviceId) {
checkValidity();
LinkProviderServiceRpcFutureStub newStub = LinkProviderServiceRpcGrpc.newFutureStub(channel);
ListenableFuture<Void> future = newStub.linkVanished(vanishMsg(provider().id(), deviceId));
try {
// There's no need to wait, but just checking server
future.get(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("linksVanished({}) failed", deviceId, e);
invalidate();
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
log.error("linksVanished({}) failed", deviceId, e);
invalidate();
} catch (TimeoutException e) {
log.error("linksVanished({}) failed", deviceId, e);
invalidate();
}
}
/**
* Builds {@link LinkDetectedMsg}.
*
* @param id ProviderId
* @param linkDescription {@link LinkDescription}
* @return {@link LinkDetectedMsg}
*/
private LinkDetectedMsg detectMsg(ProviderId id,
LinkDescription linkDescription) {
LinkDetectedMsg.Builder builder = LinkDetectedMsg.newBuilder();
builder.setProviderId(id.scheme())
.setLinkDescription(builder.getLinkDescriptionBuilder()
.setSrc(translate(linkDescription.src()))
.setDst(translate(linkDescription.dst()))
.setType(translate(linkDescription.type()))
.putAllAnnotations(asMap(linkDescription.annotations()))
.build()
);
return builder.build();
}
/**
* Builds {@link LinkVanishedMsg}.
*
* @param id ProviderId
* @param linkDescription {@link LinkDescription}
* @return {@link LinkVanishedMsg}
*/
private LinkVanishedMsg vanishMsg(ProviderId id,
LinkDescription linkDescription) {
LinkVanishedMsg.Builder builder = LinkVanishedMsg.newBuilder();
builder.setProviderId(id.scheme())
.setLinkDescription(builder.getLinkDescriptionBuilder()
.setSrc(translate(linkDescription.src()))
.setDst(translate(linkDescription.dst()))
.setType(translate(linkDescription.type()))
.putAllAnnotations(asMap(linkDescription.annotations()))
.build()
);
return builder.build();
}
/**
* Builds {@link LinkVanishedMsg}.
*
* @param id ProviderId
* @param connectPoint {@link ConnectPoint}
* @return {@link LinkVanishedMsg}
*/
private LinkVanishedMsg vanishMsg(ProviderId id,
ConnectPoint connectPoint) {
LinkVanishedMsg.Builder builder = LinkVanishedMsg.newBuilder();
builder.setProviderId(id.scheme())
.setConnectPoint(translate(connectPoint));
return builder.build();
}
/**
* Builds {@link LinkVanishedMsg}.
*
* @param id ProviderId
* @param deviceId {@link DeviceId}
* @return {@link LinkVanishedMsg}
*/
private LinkVanishedMsg vanishMsg(ProviderId id, DeviceId deviceId) {
LinkVanishedMsg.Builder builder = LinkVanishedMsg.newBuilder();
builder.setProviderId(id.scheme())
.setDeviceId(deviceId.toString());
return builder.build();
}
/**
* Translates ONOS object to gRPC message.
*
* @param type {@link Link.Type}
* @return gRPC LinkType
*/
private LinkType translate(Type type) {
switch (type) {
case DIRECT:
return LinkType.DIRECT;
case EDGE:
return LinkType.EDGE;
case INDIRECT:
return LinkType.INDIRECT;
case OPTICAL:
return LinkType.OPTICAL;
case TUNNEL:
return LinkType.TUNNEL;
case VIRTUAL:
return LinkType.VIRTUAL;
default:
return LinkType.DIRECT;
}
}
/**
* Translates ONOS object to gRPC message.
*
* @param cp {@link ConnectPoint}
* @return gRPC ConnectPoint
*/
private org.onosproject.grpc.Link.ConnectPoint translate(ConnectPoint cp) {
return org.onosproject.grpc.Link.ConnectPoint.newBuilder()
.setDeviceId(cp.deviceId().toString())
.setPortNumber(cp.port().toString())
.build();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.net.DeviceId.deviceId;
import org.onosproject.grpc.Link.LinkDetectedMsg;
import org.onosproject.grpc.Link.LinkType;
import org.onosproject.grpc.Link.LinkVanishedMsg;
import org.onosproject.grpc.Link.Void;
import org.onosproject.grpc.Link.ConnectPoint.ElementIdCase;
import org.onosproject.grpc.LinkProviderServiceRpcGrpc.LinkProviderServiceRpc;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.PortNumber;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkProviderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.client.repackaged.com.google.common.annotations.Beta;
import io.grpc.stub.StreamObserver;
/**
* Server-side implementation of gRPC version of LinkProviderService.
*/
@Beta
final class LinkProviderServiceServerProxy
implements LinkProviderServiceRpc {
private final Logger log = LoggerFactory.getLogger(getClass());
private final GrpcRemoteServiceServer server;
LinkProviderServiceServerProxy(GrpcRemoteServiceServer server) {
this.server = checkNotNull(server);
}
/**
* Gets or creates {@link LinkProviderService} registered for given ProviderId scheme.
*
* @param scheme ProviderId scheme.
* @return {@link LinkProviderService}
*/
private LinkProviderService getLinkProviderServiceFor(String scheme) {
return server.getLinkProviderServiceFor(scheme);
}
@Override
public void linkDetected(LinkDetectedMsg request,
StreamObserver<Void> responseObserver) {
try {
onLinkDetected(request, responseObserver);
// If onNext call was not mandatory, it can be removed.
responseObserver.onNext(Void.getDefaultInstance());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception caught", e);
responseObserver.onError(e);
}
}
private void onLinkDetected(LinkDetectedMsg request,
StreamObserver<Void> responseObserver) {
String scheme = request.getProviderId();
LinkProviderService linkProviderService = getLinkProviderServiceFor(scheme);
LinkDescription linkDescription = translate(request.getLinkDescription());
linkProviderService.linkDetected(linkDescription);
}
@Override
public void linkVanished(LinkVanishedMsg request,
StreamObserver<Void> responseObserver) {
try {
onLinksVanished(request, responseObserver);
// If onNext call was not mandatory, it can be removed.
responseObserver.onNext(Void.getDefaultInstance());
responseObserver.onCompleted();
} catch (Exception e) {
log.error("Exception caught", e);
responseObserver.onError(e);
}
}
private void onLinksVanished(LinkVanishedMsg request,
StreamObserver<Void> responseObserver) {
String scheme = request.getProviderId();
switch (request.getSubjectCase()) {
case CONNECT_POINT:
ConnectPoint cp = translate(request.getConnectPoint());
getLinkProviderServiceFor(scheme).linksVanished(cp);
break;
case DEVICE_ID:
DeviceId did = deviceId(request.getDeviceId());
getLinkProviderServiceFor(scheme).linksVanished(did);
break;
case LINK_DESCRIPTION:
LinkDescription desc = translate(request.getLinkDescription());
getLinkProviderServiceFor(scheme).linkVanished(desc);
break;
case SUBJECT_NOT_SET:
default:
// do nothing
break;
}
}
/**
* Translates gRPC message to corresponding ONOS object.
*
* @param connectPoint gRPC message.
* @return {@link ConnectPoint}
*/
private ConnectPoint translate(org.onosproject.grpc.Link.ConnectPoint connectPoint) {
checkArgument(connectPoint.getElementIdCase() == ElementIdCase.DEVICE_ID,
"Only DeviceId supported.");
return new ConnectPoint(deviceId(connectPoint.getDeviceId()),
PortNumber.fromString(connectPoint.getPortNumber()));
}
/**
* Translates gRPC message to corresponding ONOS object.
*
* @param linkDescription gRPC message
* @return {@link LinkDescription}
*/
private LinkDescription translate(org.onosproject.grpc.Link.LinkDescription linkDescription) {
ConnectPoint src = translate(linkDescription.getSrc());
ConnectPoint dst = translate(linkDescription.getDst());
Link.Type type = translate(linkDescription.getType());
SparseAnnotations annotations = GrpcDeviceUtils.asAnnotations(linkDescription.getAnnotations());
return new DefaultLinkDescription(src, dst, type, annotations);
}
/**
* Translates gRPC message to corresponding ONOS object.
*
* @param type gRPC message enum
* @return {@link Type}
*/
private Link.Type translate(LinkType type) {
switch (type) {
case DIRECT:
return Link.Type.DIRECT;
case EDGE:
return Link.Type.EDGE;
case INDIRECT:
return Link.Type.INDIRECT;
case OPTICAL:
return Link.Type.INDIRECT;
case TUNNEL:
return Link.Type.TUNNEL;
case VIRTUAL:
return Link.Type.VIRTUAL;
case UNRECOGNIZED:
default:
return Link.Type.DIRECT;
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.incubator.rpc.grpc;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import com.google.api.client.repackaged.com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects;
/**
* Stub LinkProvider to be registered on Server-side.
*/
@Beta
final class StubLinkProvider extends AbstractProvider implements LinkProvider {
protected StubLinkProvider(String scheme) {
super(new ProviderId(scheme, GrpcRemoteServiceServer.RPC_PROVIDER_NAME));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id())
.toString();
}
}
syntax = "proto3";
option java_package = "org.onosproject.grpc";
package Link;
enum LinkType {
// Signifies that this is a direct single-segment link.
DIRECT = 0;
// Signifies that this link is potentially comprised from multiple
//underlying segments or hops, and as such should be used to tag
// links traversing optical paths, tunnels or intervening 'dark'
// switches.
INDIRECT = 1;
// Signifies that this link is an edge, i.e. host link.
EDGE = 2;
// Signifies that this link represents a logical link backed by
// some form of a tunnel, e.g., GRE, MPLS, ODUk, OCH.
TUNNEL = 3;
// Signifies that this link is realized by fiber (either single channel or WDM).
OPTICAL = 4;
// Signifies that this link is a virtual link or a pseudo-wire.
VIRTUAL = 5;
}
message ConnectPoint {
oneof element_id {
// DeviceID as String DeviceId#toString
string device_id = 1;
// TODO add support to other element_id if required
}
// PortNumber as String PortNumber#toString
string port_number = 2;
}
message LinkDescription {
ConnectPoint src = 1;
ConnectPoint dst = 2;
LinkType type = 3;
map<string, string> annotations = 4;
}
// Message te represent no return value
message Void {}
message LinkDetectedMsg {
// ProviderId scheme only
string provider_id = 1;
LinkDescription link_description = 2;
}
message LinkVanishedMsg {
// ProviderId scheme only
string provider_id = 1;
oneof subject {
LinkDescription link_description = 2;
ConnectPoint connect_point = 3;
string device_id = 4;
}
}
service LinkProviderServiceRpc {
rpc LinkDetected(LinkDetectedMsg) returns (Void);
rpc LinkVanished(LinkVanishedMsg) returns (Void);
}
......@@ -17,6 +17,7 @@ package org.onosproject.incubator.rpc.grpc;
import static org.junit.Assert.*;
import static org.onosproject.net.DeviceId.deviceId;
import static org.onosproject.net.PortNumber.portNumber;
import java.io.IOException;
import java.net.ServerSocket;
......@@ -37,9 +38,11 @@ import org.onosproject.incubator.rpc.RemoteServiceContext;
import org.onosproject.incubator.rpc.RemoteServiceContextProvider;
import org.onosproject.incubator.rpc.RemoteServiceContextProviderService;
import org.onosproject.incubator.rpc.RemoteServiceProviderRegistry;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device.Type;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
import org.onosproject.net.SparseAnnotations;
......@@ -51,6 +54,11 @@ import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkDescription;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.provider.AbstractProviderRegistry;
import org.onosproject.net.provider.AbstractProviderService;
import org.onosproject.net.provider.ProviderId;
......@@ -99,6 +107,9 @@ public class GrpcRemoteServiceTest {
private MTestDeviceProviderService svDeviceProviderService;
private ServerSideLinkProviderService svLinkProviderService;
private CountDownLatch serverReady;
private URI uri;
......@@ -121,6 +132,7 @@ public class GrpcRemoteServiceTest {
serverReady = new CountDownLatch(1);
server = new GrpcRemoteServiceServer();
server.deviceProviderRegistry = new MTestDeviceProviderRegistry();
server.linkProviderRegistry = new ServerSideLinkProviderRegistry();
// todo: pass proper ComponentContext
server.listenPort = pickListenPort();
uri = URI.create("grpc://localhost:" + server.listenPort);
......@@ -135,6 +147,7 @@ public class GrpcRemoteServiceTest {
public void tearDown() {
client.deactivate();
server.deactivate();
svLinkProviderService = null;
}
private static void assertEqualsButNotSame(Object expected, Object actual) {
......@@ -144,7 +157,7 @@ public class GrpcRemoteServiceTest {
}
@Test
public void basics() throws InterruptedException {
public void deviceServiceBasics() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
......@@ -217,6 +230,116 @@ public class GrpcRemoteServiceTest {
DEVICE_ID, clDeviceProvider.isReachableDid);
}
@Test
public void linkVanishedDevice() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
LinkProviderRegistry providerRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
assertNotNull(providerRegistry);
final String schemeTest = "test";
LinkProviderService client = providerRegistry.register(new StubLinkProvider(schemeTest));
assertNotNull(client);
client.linksVanished(DEVICE_ID);
assertEquals(schemeTest, svLinkProviderService.provider().id().scheme());
assertTrue(svLinkProviderService.calls.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(DEVICE_ID, svLinkProviderService.arg);
}
@Test
public void linkVanishedPort() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
LinkProviderRegistry providerRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
assertNotNull(providerRegistry);
final String schemeTest = "test";
LinkProviderService client = providerRegistry.register(new StubLinkProvider(schemeTest));
assertNotNull(client);
final ConnectPoint cp = new ConnectPoint(DEVICE_ID, PORT);
client.linksVanished(cp);
assertEquals(schemeTest, svLinkProviderService.provider().id().scheme());
assertTrue(svLinkProviderService.calls.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(cp, svLinkProviderService.arg);
}
@Test
public void linkVanishedDescription() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
LinkProviderRegistry providerRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
assertNotNull(providerRegistry);
final String schemeTest = "test";
LinkProviderService client = providerRegistry.register(new StubLinkProvider(schemeTest));
assertNotNull(client);
ConnectPoint src = new ConnectPoint(deviceId("dev:1"), portNumber(10));
ConnectPoint dst = new ConnectPoint(deviceId("dev:2"), portNumber(20));
LinkDescription linkDescription = new DefaultLinkDescription(src, dst, Link.Type.DIRECT, ANON);
client.linkVanished(linkDescription);
assertEquals(schemeTest, svLinkProviderService.provider().id().scheme());
assertTrue(svLinkProviderService.calls.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(linkDescription, svLinkProviderService.arg);
}
@Test
public void linkDetected() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
LinkProviderRegistry providerRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
assertNotNull(providerRegistry);
final String schemeTest = "test";
LinkProviderService client = providerRegistry.register(new StubLinkProvider(schemeTest));
assertNotNull(client);
ConnectPoint src = new ConnectPoint(deviceId("dev:1"), portNumber(10));
ConnectPoint dst = new ConnectPoint(deviceId("dev:2"), portNumber(20));
LinkDescription linkDescription = new DefaultLinkDescription(src, dst, Link.Type.DIRECT, ANON);
client.linkDetected(linkDescription);
assertEquals(schemeTest, svLinkProviderService.provider().id().scheme());
assertTrue(svLinkProviderService.calls.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(linkDescription, svLinkProviderService.arg);
}
@Test
public void linkServiceBasics() throws InterruptedException {
RemoteServiceContext remoteServiceContext = client.get(uri);
assertNotNull(remoteServiceContext);
LinkProviderRegistry providerRegistry = remoteServiceContext.get(LinkProviderRegistry.class);
assertNotNull(providerRegistry);
final String schemeTest = "test";
LinkProviderService client = providerRegistry.register(new StubLinkProvider(schemeTest));
assertNotNull(client);
ConnectPoint src = new ConnectPoint(deviceId("dev:1"), portNumber(10));
ConnectPoint dst = new ConnectPoint(deviceId("dev:2"), portNumber(20));
LinkDescription linkDescription = new DefaultLinkDescription(src, dst, Link.Type.DIRECT, ANON);
client.linkDetected(linkDescription);
assertEquals(schemeTest, svLinkProviderService.provider().id().scheme());
assertTrue(svLinkProviderService.calls.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(linkDescription, svLinkProviderService.arg);
svLinkProviderService.reset();
client.linkVanished(linkDescription);
assertEquals(schemeTest, svLinkProviderService.provider().id().scheme());
assertTrue(svLinkProviderService.calls.await(10, TimeUnit.SECONDS));
assertEqualsButNotSame(linkDescription, svLinkProviderService.arg);
}
/**
* Device Provider on CO side.
*/
......@@ -395,4 +518,61 @@ public class GrpcRemoteServiceTest {
}
}
public class ServerSideLinkProviderRegistry
extends AbstractProviderRegistry<LinkProvider, LinkProviderService>
implements LinkProviderRegistry {
@Override
protected LinkProviderService createProviderService(LinkProvider provider) {
svLinkProviderService = new ServerSideLinkProviderService(provider);
return svLinkProviderService;
}
}
public class ServerSideLinkProviderService
extends AbstractProviderService<LinkProvider>
implements LinkProviderService {
CountDownLatch calls = new CountDownLatch(1);
Object arg = null;
public void reset() {
calls = new CountDownLatch(1);
arg = null;
}
public ServerSideLinkProviderService(LinkProvider provider) {
super(provider);
}
@Override
public void linksVanished(DeviceId deviceId) {
log.info("linksVanished({})", deviceId);
arg = deviceId;
calls.countDown();
}
@Override
public void linksVanished(ConnectPoint connectPoint) {
log.info("linksVanished({})", connectPoint);
arg = connectPoint;
calls.countDown();
}
@Override
public void linkVanished(LinkDescription linkDescription) {
log.info("linksVanished({})", linkDescription);
arg = linkDescription;
calls.countDown();
}
@Override
public void linkDetected(LinkDescription linkDescription) {
log.info("linkDetected({})", linkDescription);
arg = linkDescription;
calls.countDown();
}
}
}
......