Charles Chan
Committed by Gerrit Code Review

Implement map listener in DistributedMcastStore

Fix the problem that only local delegation is triggered

Change-Id: Ibe7b4a6abb447b9a8e3a0959882ef2da0d21f4be
......@@ -31,14 +31,18 @@ import org.onosproject.net.mcast.McastStoreDelegate;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkState;
import static org.slf4j.LoggerFactory.getLogger;
/**
......@@ -62,11 +66,16 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
protected ConsistentMap<McastRoute, MulticastData> mcastRib;
protected Map<McastRoute, MulticastData> mcastRoutes;
private final MapEventListener<McastRoute, MulticastData> mcastMapListener =
new McastMapListener();
// NOTE: MapEvent cannot provide correct old value of sink since MulticastData
// is a object reference. Use this localSink to track sink.
private Map<McastRoute, Set<ConnectPoint>> localSink =
new ConcurrentHashMap<>();
@Activate
public void activate() {
mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
.withName(MCASTRIB)
.withSerializer(Serializer.using(KryoNamespace.newBuilder()
......@@ -79,10 +88,9 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
).build()))
//.withRelaxedReadConsistency()
.build();
mcastRib.addListener(mcastMapListener);
mcastRoutes = mcastRib.asJavaMap();
log.info("Started");
}
......@@ -95,16 +103,10 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
public void storeRoute(McastRoute route, Type operation) {
switch (operation) {
case ADD:
if (mcastRoutes.putIfAbsent(route, MulticastData.empty()) == null) {
delegate.notify(new McastEvent(McastEvent.Type.ROUTE_ADDED,
McastRouteInfo.mcastRouteInfo(route)));
}
mcastRoutes.putIfAbsent(route, MulticastData.empty());
break;
case REMOVE:
if (mcastRoutes.remove(route) != null) {
delegate.notify(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
McastRouteInfo.mcastRouteInfo(route)));
}
mcastRoutes.remove(route);
break;
default:
log.warn("Unknown mcast operation type: {}", operation);
......@@ -121,15 +123,6 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
}
return v;
});
if (data != null) {
delegate.notify(new McastEvent(McastEvent.Type.SOURCE_ADDED,
McastRouteInfo.mcastRouteInfo(route,
data.sinks(),
source)));
}
}
@Override
......@@ -152,31 +145,6 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
}
return v;
});
if (data != null) {
switch (operation) {
case ADD:
delegate.notify(new McastEvent(
McastEvent.Type.SINK_ADDED,
McastRouteInfo.mcastRouteInfo(route,
sink,
data.source())));
break;
case REMOVE:
if (data != null) {
delegate.notify(new McastEvent(
McastEvent.Type.SINK_REMOVED,
McastRouteInfo.mcastRouteInfo(route,
sink,
data.source())));
}
break;
default:
log.warn("Unknown mcast operation type: {}", operation);
}
}
}
@Override
......@@ -194,4 +162,75 @@ public class DistributedMcastStore extends AbstractStore<McastEvent, McastStoreD
return mcastRoutes.keySet();
}
private class McastMapListener implements MapEventListener<McastRoute, MulticastData> {
@Override
public void event(MapEvent<McastRoute, MulticastData> event) {
McastRoute route = event.key();
MulticastData newValue, oldValue;
switch (event.type()) {
case INSERT:
checkState(event.newValue() != null, "Map insert event should have newValue");
newValue = event.newValue().value();
if (newValue.source() != null) {
notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
McastRouteInfo.mcastRouteInfo(route,
newValue.sinks(), newValue.source())));
}
if (!newValue.sinks().isEmpty()) {
newValue.sinks().forEach(sink -> {
notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
McastRouteInfo.mcastRouteInfo(route,
sink, newValue.source())));
});
}
if (newValue.source() == null && newValue.sinks().isEmpty()) {
notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
McastRouteInfo.mcastRouteInfo(route)));
}
localSink.put(route, newValue.sinks());
break;
case REMOVE:
checkState(event.oldValue() != null, "Map remove event should have oldValue");
oldValue = event.oldValue().value();
notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
McastRouteInfo.mcastRouteInfo(route)));
oldValue.sinks().forEach(sink -> {
notifyDelegate(new McastEvent(
McastEvent.Type.SINK_REMOVED,
McastRouteInfo.mcastRouteInfo(route, sink, oldValue.source())));
});
localSink.remove(route);
break;
case UPDATE:
checkState(event.newValue() != null, "Map update event should have newValue");
checkState(event.oldValue() != null, "Map update event should have oldValue");
newValue = event.newValue().value();
oldValue = event.oldValue().value();
if (newValue.source() != null && oldValue.source() == null) {
notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
McastRouteInfo.mcastRouteInfo(route,
newValue.sinks(), newValue.source())));
}
newValue.sinks().stream()
.filter(sink -> !localSink.get(route).contains(sink))
.forEach(addedSink -> {
notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
McastRouteInfo.mcastRouteInfo(route,
addedSink, newValue.source())));
});
localSink.get(route).stream()
.filter(sink -> !newValue.sinks().contains(sink))
.forEach(removedSink -> {
notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
McastRouteInfo.mcastRouteInfo(route,
removedSink, newValue.source())));
});
localSink.put(route, newValue.sinks());
break;
default:
break;
}
}
}
}
......