alshabib
Committed by Gerrit Code Review

support addition and removal of mcast sinks

vlan mcast rules are now optional

Change-Id: Icb7022089a6e139970040d8cdea97df0cdc8dc7c
...@@ -28,10 +28,8 @@ import org.apache.felix.scr.annotations.Property; ...@@ -28,10 +28,8 @@ import org.apache.felix.scr.annotations.Property;
28 import org.apache.felix.scr.annotations.Reference; 28 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality; 29 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.onlab.packet.Ethernet; 30 import org.onlab.packet.Ethernet;
31 -import org.onlab.packet.IPv4;
32 import org.onlab.packet.IpAddress; 31 import org.onlab.packet.IpAddress;
33 import org.onlab.packet.VlanId; 32 import org.onlab.packet.VlanId;
34 -import org.onlab.util.Tools;
35 import org.onosproject.cfg.ComponentConfigService; 33 import org.onosproject.cfg.ComponentConfigService;
36 import org.onosproject.codec.CodecService; 34 import org.onosproject.codec.CodecService;
37 import org.onosproject.core.ApplicationId; 35 import org.onosproject.core.ApplicationId;
...@@ -60,10 +58,13 @@ import org.slf4j.Logger; ...@@ -60,10 +58,13 @@ import org.slf4j.Logger;
60 58
61 import java.util.Dictionary; 59 import java.util.Dictionary;
62 import java.util.Map; 60 import java.util.Map;
61 +import java.util.Properties;
63 import java.util.concurrent.atomic.AtomicBoolean; 62 import java.util.concurrent.atomic.AtomicBoolean;
64 import java.util.concurrent.atomic.AtomicInteger; 63 import java.util.concurrent.atomic.AtomicInteger;
65 64
65 +import static com.google.common.base.Strings.isNullOrEmpty;
66 import static com.google.common.net.MediaType.JSON_UTF_8; 66 import static com.google.common.net.MediaType.JSON_UTF_8;
67 +import static org.onlab.util.Tools.get;
67 import static org.slf4j.LoggerFactory.getLogger; 68 import static org.slf4j.LoggerFactory.getLogger;
68 69
69 /** 70 /**
...@@ -76,6 +77,10 @@ public class CordMcast { ...@@ -76,6 +77,10 @@ public class CordMcast {
76 77
77 private static final int DEFAULT_PRIORITY = 1000; 78 private static final int DEFAULT_PRIORITY = 1000;
78 private static final short DEFAULT_MCAST_VLAN = 4000; 79 private static final short DEFAULT_MCAST_VLAN = 4000;
80 + private static final String DEFAULT_SYNC_HOST = "10.90.0.8:8181";
81 + private static final String DEFAULT_USER = "karaf";
82 + private static final String DEFAULT_PASSWORD = "karaf";
83 +
79 private final Logger log = getLogger(getClass()); 84 private final Logger log = getLogger(getClass());
80 85
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 86 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -96,11 +101,8 @@ public class CordMcast { ...@@ -96,11 +101,8 @@ public class CordMcast {
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 101 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected ComponentConfigService componentConfigService; 102 protected ComponentConfigService componentConfigService;
98 103
99 -
100 protected McastListener listener = new InternalMulticastListener(); 104 protected McastListener listener = new InternalMulticastListener();
101 105
102 -
103 -
104 //TODO: move this to a ec map 106 //TODO: move this to a ec map
105 private Map<IpAddress, Integer> groups = Maps.newConcurrentMap(); 107 private Map<IpAddress, Integer> groups = Maps.newConcurrentMap();
106 108
...@@ -109,18 +111,21 @@ public class CordMcast { ...@@ -109,18 +111,21 @@ public class CordMcast {
109 111
110 private ApplicationId appId; 112 private ApplicationId appId;
111 113
112 - //TODO: network config this 114 + @Property(name = "mcastVlan", intValue = DEFAULT_MCAST_VLAN,
113 - private short mcastVlan = DEFAULT_MCAST_VLAN; 115 + label = "VLAN for multicast traffic")
116 + private int mcastVlan = DEFAULT_MCAST_VLAN;
114 117
115 - // TODO component config this 118 + @Property(name = "vlanEnabled", boolValue = false,
116 - private int priority = DEFAULT_PRIORITY; 119 + label = "Use vlan for multicast traffic")
120 + private boolean vlanEnabled = false;
117 121
118 - private static final String DEFAULT_USER = "karaf"; 122 + @Property(name = "priority", intValue = DEFAULT_PRIORITY,
119 - private static final String DEFAULT_PASSWORD = "karaf"; 123 + label = "Priority for multicast rules")
124 + private int priority = DEFAULT_PRIORITY;
120 125
121 - @Property(name = "syncHost", value = "", 126 + @Property(name = "syncHost", value = DEFAULT_SYNC_HOST,
122 label = "host:port to synchronize routes to") 127 label = "host:port to synchronize routes to")
123 - private String syncHost = "10.90.0.8:8181"; 128 + private String syncHost = DEFAULT_SYNC_HOST;
124 129
125 @Property(name = "username", value = DEFAULT_USER, 130 @Property(name = "username", value = DEFAULT_USER,
126 label = "Username for REST password authentication") 131 label = "Username for REST password authentication")
...@@ -153,10 +158,37 @@ public class CordMcast { ...@@ -153,10 +158,37 @@ public class CordMcast {
153 158
154 @Modified 159 @Modified
155 public void modified(ComponentContext context) { 160 public void modified(ComponentContext context) {
156 - Dictionary<?, ?> properties = context.getProperties(); 161 + Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
157 - user = Tools.get(properties, "username"); 162 +
158 - password = Tools.get(properties, "password"); 163 +
159 - syncHost = Tools.get(properties, "syncHost"); 164 + try {
165 + String s = get(properties, "username");
166 + user = isNullOrEmpty(s) ? DEFAULT_USER : s.trim();
167 +
168 + s = get(properties, "password");
169 + password = isNullOrEmpty(s) ? DEFAULT_PASSWORD : s.trim();
170 +
171 + s = get(properties, "mcastVlan");
172 + mcastVlan = isNullOrEmpty(s) ? DEFAULT_MCAST_VLAN : Short.parseShort(s.trim());
173 +
174 + s = get(properties, "vlanEnabled");
175 + vlanEnabled = isNullOrEmpty(s) || Boolean.parseBoolean(s.trim());
176 +
177 + s = get(properties, "priority");
178 + priority = isNullOrEmpty(s) ? DEFAULT_PRIORITY : Integer.parseInt(s.trim());
179 +
180 + s = get(properties, syncHost);
181 + syncHost = isNullOrEmpty(s) ? DEFAULT_SYNC_HOST : s.trim();
182 + } catch (Exception e) {
183 + user = DEFAULT_USER;
184 + password = DEFAULT_PASSWORD;
185 + syncHost = DEFAULT_SYNC_HOST;
186 + mcastVlan = DEFAULT_MCAST_VLAN;
187 + vlanEnabled = false;
188 + priority = DEFAULT_PRIORITY;
189 + }
190 +
191 +
160 } 192 }
161 193
162 private class InternalMulticastListener implements McastListener { 194 private class InternalMulticastListener implements McastListener {
...@@ -173,6 +205,7 @@ public class CordMcast { ...@@ -173,6 +205,7 @@ public class CordMcast {
173 provisionGroup(event.subject()); 205 provisionGroup(event.subject());
174 break; 206 break;
175 case SINK_REMOVED: 207 case SINK_REMOVED:
208 + unprovisionGroup(event.subject());
176 break; 209 break;
177 default: 210 default:
178 log.warn("Unknown mcast event {}", event.type()); 211 log.warn("Unknown mcast event {}", event.type());
...@@ -180,6 +213,39 @@ public class CordMcast { ...@@ -180,6 +213,39 @@ public class CordMcast {
180 } 213 }
181 } 214 }
182 215
216 + private void unprovisionGroup(McastRouteInfo info) {
217 + if (!info.sink().isPresent()) {
218 + log.warn("No sink given after sink removed event: {}", info);
219 + return;
220 + }
221 + ConnectPoint loc = info.sink().get();
222 +
223 + NextObjective next = DefaultNextObjective.builder()
224 + .fromApp(appId)
225 + .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
226 + .withType(NextObjective.Type.BROADCAST)
227 + .withId(groups.get(info.route().group()))
228 + .removeFromExisting(new ObjectiveContext() {
229 + @Override
230 + public void onSuccess(Objective objective) {
231 + //TODO: change to debug
232 + log.info("Next Objective {} installed", objective.id());
233 + }
234 +
235 + @Override
236 + public void onError(Objective objective, ObjectiveError error) {
237 + //TODO: change to debug
238 + log.info("Next Objective {} failed, because {}",
239 + objective.id(),
240 + error);
241 + }
242 + });
243 +
244 + flowObjectiveService.next(loc.deviceId(), next);
245 +
246 +
247 + }
248 +
183 private void provisionGroup(McastRouteInfo info) { 249 private void provisionGroup(McastRouteInfo info) {
184 if (!info.sink().isPresent()) { 250 if (!info.sink().isPresent()) {
185 log.warn("No sink given after sink added event: {}", info); 251 log.warn("No sink given after sink added event: {}", info);
...@@ -192,12 +258,37 @@ public class CordMcast { ...@@ -192,12 +258,37 @@ public class CordMcast {
192 Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> { 258 Integer nextId = groups.computeIfAbsent(info.route().group(), (g) -> {
193 Integer id = allocateId(); 259 Integer id = allocateId();
194 260
195 - TrafficSelector mcast = DefaultTrafficSelector.builder() 261 + NextObjective next = DefaultNextObjective.builder()
196 - .matchVlanId(VlanId.vlanId(mcastVlan)) 262 + .fromApp(appId)
263 + .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
264 + .withType(NextObjective.Type.BROADCAST)
265 + .withId(id)
266 + .add(new ObjectiveContext() {
267 + @Override
268 + public void onSuccess(Objective objective) {
269 + //TODO: change to debug
270 + log.info("Next Objective {} installed", objective.id());
271 + }
272 +
273 + @Override
274 + public void onError(Objective objective, ObjectiveError error) {
275 + //TODO: change to debug
276 + log.info("Next Objective {} failed, because {}",
277 + objective.id(),
278 + error);
279 + }
280 + });
281 +
282 + flowObjectiveService.next(loc.deviceId(), next);
283 +
284 + TrafficSelector.Builder mcast = DefaultTrafficSelector.builder()
197 .matchEthType(Ethernet.TYPE_IPV4) 285 .matchEthType(Ethernet.TYPE_IPV4)
198 - .matchIPProtocol(IPv4.PROTOCOL_IGMP) 286 + .matchIPDst(g.toIpPrefix());
199 - .matchIPDst(g.toIpPrefix()) 287 +
200 - .build(); 288 +
289 + if (vlanEnabled) {
290 + mcast.matchVlanId(VlanId.vlanId((short) mcastVlan));
291 + }
201 292
202 293
203 ForwardingObjective fwd = DefaultForwardingObjective.builder() 294 ForwardingObjective fwd = DefaultForwardingObjective.builder()
...@@ -206,7 +297,7 @@ public class CordMcast { ...@@ -206,7 +297,7 @@ public class CordMcast {
206 .makePermanent() 297 .makePermanent()
207 .withFlag(ForwardingObjective.Flag.VERSATILE) 298 .withFlag(ForwardingObjective.Flag.VERSATILE)
208 .withPriority(priority) 299 .withPriority(priority)
209 - .withSelector(mcast) 300 + .withSelector(mcast.build())
210 .add(new ObjectiveContext() { 301 .add(new ObjectiveContext() {
211 @Override 302 @Override
212 public void onSuccess(Objective objective) { 303 public void onSuccess(Objective objective) {
...@@ -228,6 +319,7 @@ public class CordMcast { ...@@ -228,6 +319,7 @@ public class CordMcast {
228 return id; 319 return id;
229 }); 320 });
230 321
322 + if (!sync.get()) {
231 NextObjective next = DefaultNextObjective.builder() 323 NextObjective next = DefaultNextObjective.builder()
232 .fromApp(appId) 324 .fromApp(appId)
233 .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build()) 325 .addTreatment(DefaultTrafficTreatment.builder().setOutput(loc.port()).build())
...@@ -250,6 +342,7 @@ public class CordMcast { ...@@ -250,6 +342,7 @@ public class CordMcast {
250 }); 342 });
251 343
252 flowObjectiveService.next(loc.deviceId(), next); 344 flowObjectiveService.next(loc.deviceId(), next);
345 + }
253 346
254 if (sync.get()) { 347 if (sync.get()) {
255 syncRoute(info); 348 syncRoute(info);
......
...@@ -246,7 +246,7 @@ public class IgmpSnoop { ...@@ -246,7 +246,7 @@ public class IgmpSnoop {
246 .withMeta(DefaultTrafficTreatment.builder() 246 .withMeta(DefaultTrafficTreatment.builder()
247 .setOutput(PortNumber.CONTROLLER).build()) 247 .setOutput(PortNumber.CONTROLLER).build())
248 .fromApp(appId) 248 .fromApp(appId)
249 - .withPriority(1000) 249 + .withPriority(10000)
250 .add(new ObjectiveContext() { 250 .add(new ObjectiveContext() {
251 @Override 251 @Override
252 public void onSuccess(Objective objective) { 252 public void onSuccess(Objective objective) {
...@@ -417,6 +417,7 @@ public class IgmpSnoop { ...@@ -417,6 +417,7 @@ public class IgmpSnoop {
417 private class InternalDeviceListener implements DeviceListener { 417 private class InternalDeviceListener implements DeviceListener {
418 @Override 418 @Override
419 public void event(DeviceEvent event) { 419 public void event(DeviceEvent event) {
420 + DeviceId devId = event.subject().id();
420 switch (event.type()) { 421 switch (event.type()) {
421 422
422 case DEVICE_ADDED: 423 case DEVICE_ADDED:
...@@ -427,11 +428,15 @@ public class IgmpSnoop { ...@@ -427,11 +428,15 @@ public class IgmpSnoop {
427 case PORT_STATS_UPDATED: 428 case PORT_STATS_UPDATED:
428 break; 429 break;
429 case PORT_ADDED: 430 case PORT_ADDED:
430 - if (event.port().isEnabled()) { 431 + if (!oltData.get(devId).uplink().equals(event.port().number()) &&
432 + event.port().isEnabled()) {
431 processFilterObjective(event.subject().id(), event.port(), false); 433 processFilterObjective(event.subject().id(), event.port(), false);
432 } 434 }
433 break; 435 break;
434 case PORT_UPDATED: 436 case PORT_UPDATED:
437 + if (oltData.get(devId).uplink().equals(event.port().number())) {
438 + break;
439 + }
435 if (event.port().isEnabled()) { 440 if (event.port().isEnabled()) {
436 processFilterObjective(event.subject().id(), event.port(), false); 441 processFilterObjective(event.subject().id(), event.port(), false);
437 } else { 442 } else {
......
...@@ -15,13 +15,10 @@ ...@@ -15,13 +15,10 @@
15 */ 15 */
16 package org.onosproject.store.serializers; 16 package org.onosproject.store.serializers;
17 17
18 -import com.esotericsoftware.kryo.serializers.JavaSerializer;
19 import com.google.common.collect.ImmutableList; 18 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap; 19 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.ImmutableSet; 20 import com.google.common.collect.ImmutableSet;
22 import com.google.common.collect.Maps; 21 import com.google.common.collect.Maps;
23 -import com.google.common.collect.Sets;
24 -
25 import org.onlab.packet.ChassisId; 22 import org.onlab.packet.ChassisId;
26 import org.onlab.packet.EthType; 23 import org.onlab.packet.EthType;
27 import org.onlab.packet.Ip4Address; 24 import org.onlab.packet.Ip4Address;
...@@ -90,8 +87,8 @@ import org.onosproject.net.device.DefaultPortDescription; ...@@ -90,8 +87,8 @@ import org.onosproject.net.device.DefaultPortDescription;
90 import org.onosproject.net.device.DefaultPortStatistics; 87 import org.onosproject.net.device.DefaultPortStatistics;
91 import org.onosproject.net.device.OchPortDescription; 88 import org.onosproject.net.device.OchPortDescription;
92 import org.onosproject.net.device.OduCltPortDescription; 89 import org.onosproject.net.device.OduCltPortDescription;
93 -import org.onosproject.net.device.OtuPortDescription;
94 import org.onosproject.net.device.OmsPortDescription; 90 import org.onosproject.net.device.OmsPortDescription;
91 +import org.onosproject.net.device.OtuPortDescription;
95 import org.onosproject.net.device.PortStatistics; 92 import org.onosproject.net.device.PortStatistics;
96 import org.onosproject.net.flow.CompletedBatchOperation; 93 import org.onosproject.net.flow.CompletedBatchOperation;
97 import org.onosproject.net.flow.DefaultFlowEntry; 94 import org.onosproject.net.flow.DefaultFlowEntry;
...@@ -257,7 +254,6 @@ public final class KryoNamespaces { ...@@ -257,7 +254,6 @@ public final class KryoNamespaces {
257 .register(HashMap.class) 254 .register(HashMap.class)
258 .register(ConcurrentHashMap.class) 255 .register(ConcurrentHashMap.class)
259 .register(CopyOnWriteArraySet.class) 256 .register(CopyOnWriteArraySet.class)
260 - .register(new JavaSerializer(), Sets.newConcurrentHashSet().getClass())
261 .register(ArrayList.class, 257 .register(ArrayList.class,
262 LinkedList.class, 258 LinkedList.class,
263 HashSet.class, 259 HashSet.class,
...@@ -541,5 +537,6 @@ public final class KryoNamespaces { ...@@ -541,5 +537,6 @@ public final class KryoNamespaces {
541 537
542 538
543 // not to be instantiated 539 // not to be instantiated
544 - private KryoNamespaces() {} 540 + private KryoNamespaces() {
541 + }
545 } 542 }
......
...@@ -25,6 +25,7 @@ import org.apache.commons.lang3.tuple.Pair; ...@@ -25,6 +25,7 @@ import org.apache.commons.lang3.tuple.Pair;
25 import org.onlab.osgi.ServiceDirectory; 25 import org.onlab.osgi.ServiceDirectory;
26 import org.onlab.packet.EthType; 26 import org.onlab.packet.EthType;
27 import org.onlab.packet.IPv4; 27 import org.onlab.packet.IPv4;
28 +import org.onlab.packet.IpPrefix;
28 import org.onlab.packet.VlanId; 29 import org.onlab.packet.VlanId;
29 import org.onlab.util.KryoNamespace; 30 import org.onlab.util.KryoNamespace;
30 import org.onosproject.core.ApplicationId; 31 import org.onosproject.core.ApplicationId;
...@@ -47,6 +48,7 @@ import org.onosproject.net.flow.TrafficTreatment; ...@@ -47,6 +48,7 @@ import org.onosproject.net.flow.TrafficTreatment;
47 import org.onosproject.net.flow.criteria.Criteria; 48 import org.onosproject.net.flow.criteria.Criteria;
48 import org.onosproject.net.flow.criteria.Criterion; 49 import org.onosproject.net.flow.criteria.Criterion;
49 import org.onosproject.net.flow.criteria.EthTypeCriterion; 50 import org.onosproject.net.flow.criteria.EthTypeCriterion;
51 +import org.onosproject.net.flow.criteria.IPCriterion;
50 import org.onosproject.net.flow.criteria.IPProtocolCriterion; 52 import org.onosproject.net.flow.criteria.IPProtocolCriterion;
51 import org.onosproject.net.flow.criteria.PortCriterion; 53 import org.onosproject.net.flow.criteria.PortCriterion;
52 import org.onosproject.net.flow.criteria.VlanIdCriterion; 54 import org.onosproject.net.flow.criteria.VlanIdCriterion;
...@@ -99,6 +101,7 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner { ...@@ -99,6 +101,7 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
99 101
100 private DeviceId deviceId; 102 private DeviceId deviceId;
101 private ApplicationId appId; 103 private ApplicationId appId;
104 + private IpPrefix mcastPrefix = IpPrefix.valueOf("224.0.0.0/4");
102 105
103 protected FlowObjectiveStore flowObjectiveStore; 106 protected FlowObjectiveStore flowObjectiveStore;
104 107
...@@ -253,6 +256,12 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner { ...@@ -253,6 +256,12 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
253 GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment); 256 GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
254 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id())); 257 GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
255 258
259 +
260 +
261 + pendingGroups.put(key, nextObjective);
262 +
263 + switch (nextObjective.op()) {
264 + case ADD:
256 GroupDescription groupDesc = 265 GroupDescription groupDesc =
257 new DefaultGroupDescription(deviceId, 266 new DefaultGroupDescription(deviceId,
258 GroupDescription.Type.ALL, 267 GroupDescription.Type.ALL,
...@@ -260,19 +269,20 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner { ...@@ -260,19 +269,20 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
260 key, 269 key,
261 null, 270 null,
262 nextObjective.appId()); 271 nextObjective.appId());
263 -
264 - pendingGroups.put(key, nextObjective);
265 -
266 - switch (nextObjective.op()) {
267 - case ADD:
268 groupService.addGroup(groupDesc); 272 groupService.addGroup(groupDesc);
269 break; 273 break;
270 case REMOVE: 274 case REMOVE:
271 groupService.removeGroup(deviceId, key, nextObjective.appId()); 275 groupService.removeGroup(deviceId, key, nextObjective.appId());
272 break; 276 break;
273 case ADD_TO_EXISTING: 277 case ADD_TO_EXISTING:
278 + groupService.addBucketsToGroup(deviceId, key,
279 + new GroupBuckets(Collections.singletonList(bucket)),
280 + key, nextObjective.appId());
281 + break;
274 case REMOVE_FROM_EXISTING: 282 case REMOVE_FROM_EXISTING:
275 - //TODO: handle addition to group when caller signals it. 283 + groupService.removeBucketsFromGroup(deviceId, key,
284 + new GroupBuckets(Collections.singletonList(bucket)),
285 + key, nextObjective.appId());
276 break; 286 break;
277 default: 287 default:
278 log.warn("Unknown next objective operation: {}", nextObjective.op()); 288 log.warn("Unknown next objective operation: {}", nextObjective.op());
...@@ -287,14 +297,14 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner { ...@@ -287,14 +297,14 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
287 fail(fwd, ObjectiveError.BADPARAMS); 297 fail(fwd, ObjectiveError.BADPARAMS);
288 } 298 }
289 299
290 - OLTPipelineGroup next = getGroupForNextObjective(fwd.nextId()); 300 + GroupKey key = getGroupForNextObjective(fwd.nextId());
291 301
292 - if (next == null) { 302 + if (key == null) {
293 log.error("Group for forwarding objective missing: {}", fwd); 303 log.error("Group for forwarding objective missing: {}", fwd);
294 fail(fwd, ObjectiveError.GROUPMISSING); 304 fail(fwd, ObjectiveError.GROUPMISSING);
295 } 305 }
296 306
297 - Group group = groupService.getGroup(deviceId, next.key()); 307 + Group group = groupService.getGroup(deviceId, key);
298 TrafficTreatment treatment = 308 TrafficTreatment treatment =
299 buildTreatment(Instructions.createGroup(group.id())); 309 buildTreatment(Instructions.createGroup(group.id()));
300 310
...@@ -330,16 +340,20 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner { ...@@ -330,16 +340,20 @@ public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
330 340
331 private boolean checkForMulticast(ForwardingObjective fwd) { 341 private boolean checkForMulticast(ForwardingObjective fwd) {
332 342
333 - VlanIdCriterion vlan = (VlanIdCriterion) filterForCriterion(fwd.selector().criteria(), 343 + IPCriterion ip = (IPCriterion) filterForCriterion(fwd.selector().criteria(),
334 - Criterion.Type.VLAN_VID); 344 + Criterion.Type.IPV4_DST);
345 +
346 + if (ip == null) {
347 + return false;
348 + }
335 349
336 - return (vlan != null && vlan.vlanId().equals(VlanId.vlanId(MCAST_VLAN))); 350 + return mcastPrefix.contains(ip.ip());
337 351
338 } 352 }
339 353
340 - private OLTPipelineGroup getGroupForNextObjective(Integer nextId) { 354 + private GroupKey getGroupForNextObjective(Integer nextId) {
341 NextGroup next = flowObjectiveStore.getNextGroup(nextId); 355 NextGroup next = flowObjectiveStore.getNextGroup(nextId);
342 - return (OLTPipelineGroup) appKryo.deserialize(next.data()); 356 + return appKryo.deserialize(next.data());
343 357
344 } 358 }
345 359
......
...@@ -66,9 +66,9 @@ public final class MulticastData { ...@@ -66,9 +66,9 @@ public final class MulticastData {
66 sinks.put(sink, true); 66 sinks.put(sink, true);
67 } 67 }
68 68
69 - public boolean removeSink(ConnectPoint sink) { 69 + public void removeSink(ConnectPoint sink) {
70 checkNotNull(sink); 70 checkNotNull(sink);
71 - return sinks.remove(sink); 71 + sinks.remove(sink);
72 } 72 }
73 73
74 public boolean isEmpty() { 74 public boolean isEmpty() {
......