alshabib
Committed by Gerrit Code Review

OLT ability to remove a subscriber

Change-Id: I5fee9dd8189ae374bf39b0a74da5bd33304a3346
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
15 */ 15 */
16 package org.onosproject.olt; 16 package org.onosproject.olt;
17 17
18 +import com.google.common.collect.Maps;
19 +import com.google.common.collect.Sets;
18 import org.apache.felix.scr.annotations.Activate; 20 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component; 21 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate; 22 import org.apache.felix.scr.annotations.Deactivate;
...@@ -52,6 +54,7 @@ import org.slf4j.Logger; ...@@ -52,6 +54,7 @@ import org.slf4j.Logger;
52 54
53 import java.util.Map; 55 import java.util.Map;
54 import java.util.Optional; 56 import java.util.Optional;
57 +import java.util.Set;
55 import java.util.concurrent.CompletableFuture; 58 import java.util.concurrent.CompletableFuture;
56 import java.util.concurrent.ConcurrentHashMap; 59 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.ExecutorService; 60 import java.util.concurrent.ExecutorService;
...@@ -89,11 +92,16 @@ public class Olt ...@@ -89,11 +92,16 @@ public class Olt
89 private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0); 92 private static final VlanId DEFAULT_VLAN = VlanId.vlanId((short) 0);
90 93
91 private ExecutorService oltInstallers = Executors.newFixedThreadPool(4, 94 private ExecutorService oltInstallers = Executors.newFixedThreadPool(4,
92 - groupedThreads("onos/olt-service", 95 + groupedThreads("onos/olt-service",
93 - "olt-installer-%d")); 96 + "olt-installer-%d"));
94 97
95 private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>(); 98 private Map<DeviceId, AccessDeviceData> oltData = new ConcurrentHashMap<>();
96 99
100 + private Map<ConnectPoint, Set<ForwardingObjective.Builder>> objectives =
101 + Maps.newConcurrentMap();
102 +
103 + private Map<ConnectPoint, VlanId> subscribers = Maps.newConcurrentMap();
104 +
97 private InternalNetworkConfigListener configListener = 105 private InternalNetworkConfigListener configListener =
98 new InternalNetworkConfigListener(); 106 new InternalNetworkConfigListener();
99 private static final Class<AccessDeviceConfig> CONFIG_CLASS = 107 private static final Class<AccessDeviceConfig> CONFIG_CLASS =
...@@ -102,11 +110,12 @@ public class Olt ...@@ -102,11 +110,12 @@ public class Olt
102 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory = 110 private ConfigFactory<DeviceId, AccessDeviceConfig> configFactory =
103 new ConfigFactory<DeviceId, AccessDeviceConfig>( 111 new ConfigFactory<DeviceId, AccessDeviceConfig>(
104 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") { 112 SubjectFactories.DEVICE_SUBJECT_FACTORY, CONFIG_CLASS, "accessDevice") {
105 - @Override 113 + @Override
106 - public AccessDeviceConfig createConfig() { 114 + public AccessDeviceConfig createConfig() {
107 - return new AccessDeviceConfig(); 115 + return new AccessDeviceConfig();
108 - } 116 + }
109 - }; 117 + };
118 +
110 119
111 @Activate 120 @Activate
112 public void activate() { 121 public void activate() {
...@@ -152,7 +161,68 @@ public class Olt ...@@ -152,7 +161,68 @@ public class Olt
152 161
153 @Override 162 @Override
154 public void removeSubscriber(ConnectPoint port) { 163 public void removeSubscriber(ConnectPoint port) {
155 - throw new UnsupportedOperationException(); 164 + AccessDeviceData olt = oltData.get(port.deviceId());
165 +
166 + if (olt == null) {
167 + log.warn("No data found for OLT device {}", port.deviceId());
168 + return;
169 + }
170 +
171 + unprovisionSubscriber(olt.deviceId(), olt.uplink(), port.port(), olt.vlan());
172 +
173 + }
174 +
175 + private void unprovisionSubscriber(DeviceId deviceId, PortNumber uplink,
176 + PortNumber subscriberPort, VlanId deviceVlan) {
177 +
178 + //FIXME: This method is slightly ugly but it'll do until we have a better
179 + // way to remove flows from the flow store.
180 +
181 + CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
182 + CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
183 +
184 + ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
185 +
186 + VlanId subscriberVlan = subscribers.remove(cp);
187 +
188 + Set<ForwardingObjective.Builder> fwds = objectives.remove(cp);
189 +
190 + if (fwds == null || fwds.size() != 2) {
191 + log.warn("Unknown or incomplete subscriber at {}", cp);
192 + return;
193 + }
194 +
195 +
196 + fwds.stream().forEach(
197 + fwd -> flowObjectiveService.forward(deviceId,
198 + fwd.remove(new ObjectiveContext() {
199 + @Override
200 + public void onSuccess(Objective objective) {
201 + upFuture.complete(null);
202 + }
203 +
204 + @Override
205 + public void onError(Objective objective, ObjectiveError error) {
206 + upFuture.complete(error);
207 + }
208 + })));
209 +
210 + upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
211 + if (upStatus == null && downStatus == null) {
212 + post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNREGISTERED,
213 + deviceId,
214 + deviceVlan,
215 + subscriberVlan));
216 + } else if (downStatus != null) {
217 + log.error("Subscriber with vlan {} on device {} " +
218 + "on port {} failed downstream uninstallation: {}",
219 + subscriberVlan, deviceId, subscriberPort, downStatus);
220 + } else if (upStatus != null) {
221 + log.error("Subscriber with vlan {} on device {} " +
222 + "on port {} failed upstream uninstallation: {}",
223 + subscriberVlan, deviceId, subscriberPort, upStatus);
224 + }
225 + }, oltInstallers);
156 226
157 } 227 }
158 228
...@@ -190,46 +260,53 @@ public class Olt ...@@ -190,46 +260,53 @@ public class Olt
190 .build(); 260 .build();
191 261
192 262
193 - ForwardingObjective upFwd = DefaultForwardingObjective.builder() 263 + ForwardingObjective.Builder upFwd = DefaultForwardingObjective.builder()
194 .withFlag(ForwardingObjective.Flag.VERSATILE) 264 .withFlag(ForwardingObjective.Flag.VERSATILE)
195 .withPriority(1000) 265 .withPriority(1000)
196 .makePermanent() 266 .makePermanent()
197 .withSelector(upstream) 267 .withSelector(upstream)
198 .fromApp(appId) 268 .fromApp(appId)
199 - .withTreatment(upstreamTreatment) 269 + .withTreatment(upstreamTreatment);
200 - .add(new ObjectiveContext() {
201 - @Override
202 - public void onSuccess(Objective objective) {
203 - upFuture.complete(null);
204 - }
205 270
206 - @Override
207 - public void onError(Objective objective, ObjectiveError error) {
208 - upFuture.complete(error);
209 - }
210 - });
211 271
212 - ForwardingObjective downFwd = DefaultForwardingObjective.builder() 272 + ForwardingObjective.Builder downFwd = DefaultForwardingObjective.builder()
213 .withFlag(ForwardingObjective.Flag.VERSATILE) 273 .withFlag(ForwardingObjective.Flag.VERSATILE)
214 .withPriority(1000) 274 .withPriority(1000)
215 .makePermanent() 275 .makePermanent()
216 .withSelector(downstream) 276 .withSelector(downstream)
217 .fromApp(appId) 277 .fromApp(appId)
218 - .withTreatment(downstreamTreatment) 278 + .withTreatment(downstreamTreatment);
219 - .add(new ObjectiveContext() {
220 - @Override
221 - public void onSuccess(Objective objective) {
222 - downFuture.complete(null);
223 - }
224 279
225 - @Override 280 + ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
226 - public void onError(Objective objective, ObjectiveError error) {
227 - downFuture.complete(error);
228 - }
229 - });
230 281
231 - flowObjectiveService.forward(deviceId, upFwd); 282 + subscribers.put(cp, subscriberVlan);
232 - flowObjectiveService.forward(deviceId, downFwd); 283 + objectives.put(cp, Sets.newHashSet(upFwd, downFwd));
284 +
285 +
286 + flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
287 + @Override
288 + public void onSuccess(Objective objective) {
289 + upFuture.complete(null);
290 + }
291 +
292 + @Override
293 + public void onError(Objective objective, ObjectiveError error) {
294 + upFuture.complete(error);
295 + }
296 + }));
297 +
298 +
299 + flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
300 + @Override
301 + public void onSuccess(Objective objective) {
302 + downFuture.complete(null);
303 + }
304 +
305 + @Override
306 + public void onError(Objective objective, ObjectiveError error) {
307 + downFuture.complete(error);
308 + }
309 + }));
233 310
234 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> { 311 upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
235 if (upStatus == null && downStatus == null) { 312 if (upStatus == null && downStatus == null) {
...@@ -288,20 +365,20 @@ public class Olt ...@@ -288,20 +365,20 @@ public class Olt
288 public void event(NetworkConfigEvent event) { 365 public void event(NetworkConfigEvent event) {
289 switch (event.type()) { 366 switch (event.type()) {
290 367
291 - case CONFIG_ADDED: 368 + case CONFIG_ADDED:
292 - case CONFIG_UPDATED: 369 + case CONFIG_UPDATED:
293 - if (event.configClass().equals(CONFIG_CLASS)) { 370 + if (event.configClass().equals(CONFIG_CLASS)) {
294 - AccessDeviceConfig config = 371 + AccessDeviceConfig config =
295 - networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS); 372 + networkConfig.getConfig((DeviceId) event.subject(), CONFIG_CLASS);
296 - if (config != null) { 373 + if (config != null) {
297 - oltData.put(config.getOlt().deviceId(), config.getOlt()); 374 + oltData.put(config.getOlt().deviceId(), config.getOlt());
375 + }
298 } 376 }
299 - } 377 + break;
300 - break; 378 + case CONFIG_UNREGISTERED:
301 - case CONFIG_UNREGISTERED: 379 + case CONFIG_REMOVED:
302 - case CONFIG_REMOVED: 380 + default:
303 - default: 381 + break;
304 - break;
305 } 382 }
306 } 383 }
307 } 384 }
......