Yuta HIGUCHI
Committed by Gerrit Code Review

Add link eviction to gRPC Link SB.

- Now assumes (remote) LinkProvider to periodically report existing Link.
- Note: This eviction mechanism can be removed, once gRPC Link SB service was
  remodelled using streaming RPC.

Change-Id: I98f05f849b876cff9bbdb648e7ac79f900f4bfcb
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
16 package org.onosproject.incubator.rpc.grpc; 16 package org.onosproject.incubator.rpc.grpc;
17 17
18 import static com.google.common.base.Preconditions.checkNotNull; 18 import static com.google.common.base.Preconditions.checkNotNull;
19 +import static java.util.concurrent.Executors.newScheduledThreadPool;
19 import static java.util.stream.Collectors.toList; 20 import static java.util.stream.Collectors.toList;
20 import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate; 21 import static org.onosproject.incubator.protobuf.net.ProtobufUtils.translate;
21 import static org.onosproject.net.DeviceId.deviceId; 22 import static org.onosproject.net.DeviceId.deviceId;
...@@ -25,6 +26,7 @@ import java.util.Map; ...@@ -25,6 +26,7 @@ import java.util.Map;
25 import java.util.Set; 26 import java.util.Set;
26 import java.util.concurrent.CompletableFuture; 27 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutionException; 28 import java.util.concurrent.ExecutionException;
29 +import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit; 30 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException; 31 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicInteger; 32 import java.util.concurrent.atomic.AtomicInteger;
...@@ -36,6 +38,7 @@ import org.apache.felix.scr.annotations.Modified; ...@@ -36,6 +38,7 @@ import org.apache.felix.scr.annotations.Modified;
36 import org.apache.felix.scr.annotations.Property; 38 import org.apache.felix.scr.annotations.Property;
37 import org.apache.felix.scr.annotations.Reference; 39 import org.apache.felix.scr.annotations.Reference;
38 import org.apache.felix.scr.annotations.ReferenceCardinality; 40 import org.apache.felix.scr.annotations.ReferenceCardinality;
41 +import org.onlab.util.Tools;
39 import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc; 42 import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc;
40 import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc; 43 import org.onosproject.grpc.net.device.DeviceProviderRegistryRpcGrpc.DeviceProviderRegistryRpc;
41 import org.onosproject.grpc.net.device.DeviceService.DeviceConnected; 44 import org.onosproject.grpc.net.device.DeviceService.DeviceConnected;
...@@ -110,8 +113,11 @@ public class GrpcRemoteServiceServer { ...@@ -110,8 +113,11 @@ public class GrpcRemoteServiceServer {
110 private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap(); 113 private final Map<String, LinkProviderService> linkProviderServices = Maps.newConcurrentMap();
111 private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap(); 114 private final Map<String, LinkProvider> linkProviders = Maps.newConcurrentMap();
112 115
116 + private ScheduledExecutorService executor;
117 +
113 @Activate 118 @Activate
114 protected void activate(ComponentContext context) throws IOException { 119 protected void activate(ComponentContext context) throws IOException {
120 + executor = newScheduledThreadPool(1, Tools.groupedThreads("grpc", "%d", log));
115 modified(context); 121 modified(context);
116 122
117 log.debug("Server starting on {}", listenPort); 123 log.debug("Server starting on {}", listenPort);
...@@ -130,6 +136,12 @@ public class GrpcRemoteServiceServer { ...@@ -130,6 +136,12 @@ public class GrpcRemoteServiceServer {
130 136
131 @Deactivate 137 @Deactivate
132 protected void deactivate() { 138 protected void deactivate() {
139 + executor.shutdown();
140 + try {
141 + executor.awaitTermination(5, TimeUnit.SECONDS);
142 + } catch (InterruptedException e) {
143 + Thread.currentThread().interrupt();
144 + }
133 145
134 registeredProviders.stream() 146 registeredProviders.stream()
135 .forEach(deviceProviderRegistry::unregister); 147 .forEach(deviceProviderRegistry::unregister);
...@@ -182,6 +194,10 @@ public class GrpcRemoteServiceServer { ...@@ -182,6 +194,10 @@ public class GrpcRemoteServiceServer {
182 return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider); 194 return linkProviderServices.computeIfAbsent(scheme, this::registerStubLinkProvider);
183 } 195 }
184 196
197 + protected ScheduledExecutorService getSharedExecutor() {
198 + return executor;
199 + }
200 +
185 // RPC Server-side code 201 // RPC Server-side code
186 // RPC session Factory 202 // RPC session Factory
187 /** 203 /**
......
...@@ -17,8 +17,14 @@ package org.onosproject.incubator.rpc.grpc; ...@@ -17,8 +17,14 @@ package org.onosproject.incubator.rpc.grpc;
17 17
18 import static com.google.common.base.Preconditions.checkArgument; 18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkNotNull; 19 import static com.google.common.base.Preconditions.checkNotNull;
20 +import static com.google.common.cache.RemovalListeners.asynchronous;
20 import static org.onosproject.net.DeviceId.deviceId; 21 import static org.onosproject.net.DeviceId.deviceId;
22 +import static org.onosproject.net.LinkKey.linkKey;
21 23
24 +import java.util.concurrent.ScheduledExecutorService;
25 +import java.util.concurrent.TimeUnit;
26 +
27 +import org.apache.commons.lang3.tuple.Pair;
22 import org.onosproject.grpc.net.Link.ConnectPoint.ElementIdCase; 28 import org.onosproject.grpc.net.Link.ConnectPoint.ElementIdCase;
23 import org.onosproject.grpc.net.Link.LinkType; 29 import org.onosproject.grpc.net.Link.LinkType;
24 import org.onosproject.grpc.net.link.LinkProviderServiceRpcGrpc.LinkProviderServiceRpc; 30 import org.onosproject.grpc.net.link.LinkProviderServiceRpcGrpc.LinkProviderServiceRpc;
...@@ -29,6 +35,7 @@ import org.onosproject.incubator.protobuf.net.ProtobufUtils; ...@@ -29,6 +35,7 @@ import org.onosproject.incubator.protobuf.net.ProtobufUtils;
29 import org.onosproject.net.ConnectPoint; 35 import org.onosproject.net.ConnectPoint;
30 import org.onosproject.net.DeviceId; 36 import org.onosproject.net.DeviceId;
31 import org.onosproject.net.Link; 37 import org.onosproject.net.Link;
38 +import org.onosproject.net.LinkKey;
32 import org.onosproject.net.PortNumber; 39 import org.onosproject.net.PortNumber;
33 import org.onosproject.net.SparseAnnotations; 40 import org.onosproject.net.SparseAnnotations;
34 import org.onosproject.net.link.DefaultLinkDescription; 41 import org.onosproject.net.link.DefaultLinkDescription;
...@@ -38,9 +45,13 @@ import org.slf4j.Logger; ...@@ -38,9 +45,13 @@ import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory; 45 import org.slf4j.LoggerFactory;
39 46
40 import com.google.api.client.repackaged.com.google.common.annotations.Beta; 47 import com.google.api.client.repackaged.com.google.common.annotations.Beta;
48 +import com.google.common.cache.Cache;
49 +import com.google.common.cache.CacheBuilder;
50 +import com.google.common.cache.RemovalNotification;
41 51
42 import io.grpc.stub.StreamObserver; 52 import io.grpc.stub.StreamObserver;
43 53
54 +// Only single instance will be created and bound to gRPC LinkProviderService
44 /** 55 /**
45 * Server-side implementation of gRPC version of LinkProviderService. 56 * Server-side implementation of gRPC version of LinkProviderService.
46 */ 57 */
...@@ -48,15 +59,39 @@ import io.grpc.stub.StreamObserver; ...@@ -48,15 +59,39 @@ import io.grpc.stub.StreamObserver;
48 final class LinkProviderServiceServerProxy 59 final class LinkProviderServiceServerProxy
49 implements LinkProviderServiceRpc { 60 implements LinkProviderServiceRpc {
50 61
62 + /**
63 + * Silence time in seconds, until link gets treated as vanished.
64 + */
65 + private static final int EVICT_LIMIT = 3 * 3;
66 +
51 private final Logger log = LoggerFactory.getLogger(getClass()); 67 private final Logger log = LoggerFactory.getLogger(getClass());
52 68
53 private final GrpcRemoteServiceServer server; 69 private final GrpcRemoteServiceServer server;
54 70
55 // TODO implement aging mechanism to automatically remove 71 // TODO implement aging mechanism to automatically remove
56 // stale links reported by dead client, etc. 72 // stale links reported by dead client, etc.
73 + /**
74 + * Evicting Cache to track last seen time.
75 + */
76 + private final Cache<Pair<String, LinkKey>, LinkDescription> lastSeen;
57 77
58 LinkProviderServiceServerProxy(GrpcRemoteServiceServer server) { 78 LinkProviderServiceServerProxy(GrpcRemoteServiceServer server) {
59 this.server = checkNotNull(server); 79 this.server = checkNotNull(server);
80 + ScheduledExecutorService executor = server.getSharedExecutor();
81 + lastSeen = CacheBuilder.newBuilder()
82 + .expireAfterWrite(EVICT_LIMIT, TimeUnit.SECONDS)
83 + .removalListener(asynchronous(this::onRemove, executor))
84 + .build();
85 +
86 + executor.scheduleWithFixedDelay(lastSeen::cleanUp,
87 + EVICT_LIMIT, EVICT_LIMIT, TimeUnit.SECONDS);
88 + }
89 +
90 + private void onRemove(RemovalNotification<Pair<String, LinkKey>, LinkDescription> n) {
91 + if (n.wasEvicted()) {
92 + getLinkProviderServiceFor(n.getKey().getLeft())
93 + .linkVanished(n.getValue());
94 + }
60 } 95 }
61 96
62 /** 97 /**
...@@ -92,6 +127,7 @@ final class LinkProviderServiceServerProxy ...@@ -92,6 +127,7 @@ final class LinkProviderServiceServerProxy
92 127
93 LinkDescription linkDescription = translate(request.getLinkDescription()); 128 LinkDescription linkDescription = translate(request.getLinkDescription());
94 linkProviderService.linkDetected(linkDescription); 129 linkProviderService.linkDetected(linkDescription);
130 + lastSeen.put(Pair.of(scheme, linkKey(linkDescription)), linkDescription);
95 } 131 }
96 132
97 @Override 133 @Override
...@@ -123,6 +159,7 @@ final class LinkProviderServiceServerProxy ...@@ -123,6 +159,7 @@ final class LinkProviderServiceServerProxy
123 case LINK_DESCRIPTION: 159 case LINK_DESCRIPTION:
124 LinkDescription desc = translate(request.getLinkDescription()); 160 LinkDescription desc = translate(request.getLinkDescription());
125 getLinkProviderServiceFor(scheme).linkVanished(desc); 161 getLinkProviderServiceFor(scheme).linkVanished(desc);
162 + lastSeen.invalidate(Pair.of(scheme, linkKey(desc)));
126 break; 163 break;
127 case SUBJECT_NOT_SET: 164 case SUBJECT_NOT_SET:
128 default: 165 default:
......