Madan Jampani

Removed deprecated map change notification support

Change-Id: Ibff2e403129ee026092a24fc15b82e80ffb8dc48
...@@ -20,9 +20,9 @@ import com.google.common.base.Charsets; ...@@ -20,9 +20,9 @@ import com.google.common.base.Charsets;
20 import com.google.common.collect.ArrayListMultimap; 20 import com.google.common.collect.ArrayListMultimap;
21 import com.google.common.collect.ImmutableList; 21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.ImmutableSet; 22 import com.google.common.collect.ImmutableSet;
23 -import com.google.common.collect.ListMultimap;
24 import com.google.common.collect.Lists; 23 import com.google.common.collect.Lists;
25 import com.google.common.collect.Maps; 24 import com.google.common.collect.Maps;
25 +import com.google.common.collect.Multimap;
26 import com.google.common.collect.Sets; 26 import com.google.common.collect.Sets;
27 import com.google.common.util.concurrent.Futures; 27 import com.google.common.util.concurrent.Futures;
28 28
...@@ -68,7 +68,6 @@ import org.onosproject.store.service.ConsistentMapBuilder; ...@@ -68,7 +68,6 @@ import org.onosproject.store.service.ConsistentMapBuilder;
68 import org.onosproject.store.service.ConsistentMapException; 68 import org.onosproject.store.service.ConsistentMapException;
69 import org.onosproject.store.service.DistributedQueueBuilder; 69 import org.onosproject.store.service.DistributedQueueBuilder;
70 import org.onosproject.store.service.EventuallyConsistentMapBuilder; 70 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
71 -import org.onosproject.store.service.MapEvent;
72 import org.onosproject.store.service.MapInfo; 71 import org.onosproject.store.service.MapInfo;
73 import org.onosproject.store.service.PartitionInfo; 72 import org.onosproject.store.service.PartitionInfo;
74 import org.onosproject.store.service.DistributedSetBuilder; 73 import org.onosproject.store.service.DistributedSetBuilder;
...@@ -126,8 +125,8 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -126,8 +125,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
126 private ExecutorService queuePollExecutor; 125 private ExecutorService queuePollExecutor;
127 private ApplicationListener appListener = new InternalApplicationListener(); 126 private ApplicationListener appListener = new InternalApplicationListener();
128 127
129 - private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap(); 128 + private final Multimap<String, DefaultAsyncConsistentMap> maps = ArrayListMultimap.create();
130 - private final ListMultimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create(); 129 + private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create();
131 private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap(); 130 private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
132 131
133 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
...@@ -451,28 +450,15 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -451,28 +450,15 @@ public class DatabaseManager implements StorageService, StorageAdminService {
451 } 450 }
452 451
453 protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) { 452 protected <K, V> DefaultAsyncConsistentMap<K, V> registerMap(DefaultAsyncConsistentMap<K, V> map) {
454 - DefaultAsyncConsistentMap<K, V> existing = maps.putIfAbsent(map.name(), map); 453 + maps.put(map.name(), map);
455 - if (existing != null) {
456 - // FIXME: We need to cleanly support different map instances with same name.
457 - log.info("Map by name {} already exists", map.name());
458 - return existing;
459 - } else {
460 if (map.applicationId() != null) { 454 if (map.applicationId() != null) {
461 mapsByApplication.put(map.applicationId(), map); 455 mapsByApplication.put(map.applicationId(), map);
462 } 456 }
463 - }
464 -
465 - clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
466 - map.serializer()::decode,
467 - map::notifyLocalListeners,
468 - eventDispatcher);
469 return map; 457 return map;
470 } 458 }
471 459
472 protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) { 460 protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
473 - if (maps.remove(map.name()) != null) { 461 + maps.remove(map.name(), map);
474 - clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
475 - }
476 if (map.applicationId() != null) { 462 if (map.applicationId() != null) {
477 mapsByApplication.remove(map.applicationId(), map); 463 mapsByApplication.remove(map.applicationId(), map);
478 } 464 }
...@@ -485,10 +471,6 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -485,10 +471,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
485 } 471 }
486 } 472 }
487 473
488 - protected static MessageSubject mapUpdatesSubject(String mapName) {
489 - return new MessageSubject(mapName + "-map-updates");
490 - }
491 -
492 private class InternalApplicationListener implements ApplicationListener { 474 private class InternalApplicationListener implements ApplicationListener {
493 @Override 475 @Override
494 public void event(ApplicationEvent event) { 476 public void event(ApplicationEvent event) {
......
...@@ -28,7 +28,6 @@ import java.util.concurrent.CopyOnWriteArraySet; ...@@ -28,7 +28,6 @@ import java.util.concurrent.CopyOnWriteArraySet;
28 import java.util.concurrent.TimeUnit; 28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference; 29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.function.BiFunction; 30 import java.util.function.BiFunction;
31 -import java.util.function.Consumer;
32 import java.util.function.Function; 31 import java.util.function.Function;
33 import java.util.function.Predicate; 32 import java.util.function.Predicate;
34 import java.util.stream.Collectors; 33 import java.util.stream.Collectors;
...@@ -74,7 +73,6 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -74,7 +73,6 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
74 private final Serializer serializer; 73 private final Serializer serializer;
75 private final boolean readOnly; 74 private final boolean readOnly;
76 private final boolean purgeOnUninstall; 75 private final boolean purgeOnUninstall;
77 - private final Consumer<MapEvent<K, V>> eventPublisher;
78 76
79 private final MetricsService metricsService; 77 private final MetricsService metricsService;
80 private final MetricsComponent metricsComponent; 78 private final MetricsComponent metricsComponent;
...@@ -130,22 +128,20 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -130,22 +128,20 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
130 Database database, 128 Database database,
131 Serializer serializer, 129 Serializer serializer,
132 boolean readOnly, 130 boolean readOnly,
133 - boolean purgeOnUninstall, 131 + boolean purgeOnUninstall) {
134 - Consumer<MapEvent<K, V>> eventPublisher) {
135 this.name = checkNotNull(name, "map name cannot be null"); 132 this.name = checkNotNull(name, "map name cannot be null");
136 this.applicationId = applicationId; 133 this.applicationId = applicationId;
137 this.database = checkNotNull(database, "database cannot be null"); 134 this.database = checkNotNull(database, "database cannot be null");
138 this.serializer = checkNotNull(serializer, "serializer cannot be null"); 135 this.serializer = checkNotNull(serializer, "serializer cannot be null");
139 this.readOnly = readOnly; 136 this.readOnly = readOnly;
140 this.purgeOnUninstall = purgeOnUninstall; 137 this.purgeOnUninstall = purgeOnUninstall;
141 - this.eventPublisher = eventPublisher;
142 this.database.registerConsumer(update -> { 138 this.database.registerConsumer(update -> {
143 SharedExecutors.getSingleThreadExecutor().execute(() -> { 139 SharedExecutors.getSingleThreadExecutor().execute(() -> {
144 if (update.target() == MAP) { 140 if (update.target() == MAP) {
145 Result<UpdateResult<String, byte[]>> result = update.output(); 141 Result<UpdateResult<String, byte[]>> result = update.output();
146 if (result.success() && result.value().mapName().equals(name)) { 142 if (result.success() && result.value().mapName().equals(name)) {
147 MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent(); 143 MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
148 - notifyLocalListeners(mapEvent); 144 + notifyListeners(mapEvent);
149 } 145 }
150 } 146 }
151 }); 147 });
...@@ -423,12 +419,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -423,12 +419,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
423 oldVersionMatch, 419 oldVersionMatch,
424 value == null ? null : serializer.encode(value)) 420 value == null ? null : serializer.encode(value))
425 .thenApply(this::unwrapResult) 421 .thenApply(this::unwrapResult)
426 - .thenApply(r -> r.<K, V>map(this::dK, serializer::decode)) 422 + .thenApply(r -> r.<K, V>map(this::dK, serializer::decode));
427 - .whenComplete((r, e) -> {
428 - if (r != null && e == null && !database.hasChangeNotificationSupport()) {
429 - notifyListeners(r.toMapEvent());
430 - }
431 - });
432 } 423 }
433 424
434 private <T> T unwrapResult(Result<T> result) { 425 private <T> T unwrapResult(Result<T> result) {
...@@ -458,26 +449,16 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -458,26 +449,16 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
458 } 449 }
459 450
460 protected void notifyListeners(MapEvent<K, V> event) { 451 protected void notifyListeners(MapEvent<K, V> event) {
461 - try { 452 + if (event == null) {
462 - if (event != null) { 453 + return;
463 - notifyLocalListeners(event);
464 - notifyRemoteListeners(event);
465 } 454 }
455 + listeners.forEach(listener -> {
456 + try {
457 + listener.event(event);
466 } catch (Exception e) { 458 } catch (Exception e) {
467 - log.warn("Failure notifying listeners about {}", event, e); 459 + log.warn("Failure notifying listener about {}", event, e);
468 - }
469 - }
470 -
471 - protected void notifyLocalListeners(MapEvent<K, V> event) {
472 - if (event != null) {
473 - listeners.forEach(listener -> listener.event(event));
474 - }
475 - }
476 -
477 - protected void notifyRemoteListeners(MapEvent<K, V> event) {
478 - if (eventPublisher != null) {
479 - eventPublisher.accept(event);
480 } 460 }
461 + });
481 } 462 }
482 463
483 private OperationTimer startTimer(String op) { 464 private OperationTimer startTimer(String op) {
......
...@@ -22,7 +22,6 @@ import org.onosproject.core.ApplicationId; ...@@ -22,7 +22,6 @@ import org.onosproject.core.ApplicationId;
22 import org.onosproject.store.service.AsyncConsistentMap; 22 import org.onosproject.store.service.AsyncConsistentMap;
23 import org.onosproject.store.service.ConsistentMap; 23 import org.onosproject.store.service.ConsistentMap;
24 import org.onosproject.store.service.ConsistentMapBuilder; 24 import org.onosproject.store.service.ConsistentMapBuilder;
25 -import org.onosproject.store.service.MapEvent;
26 import org.onosproject.store.service.Serializer; 25 import org.onosproject.store.service.Serializer;
27 26
28 /** 27 /**
...@@ -110,10 +109,7 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -110,10 +109,7 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
110 partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase, 109 partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
111 serializer, 110 serializer,
112 readOnly, 111 readOnly,
113 - purgeOnUninstall, 112 + purgeOnUninstall);
114 - event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
115 - DatabaseManager.mapUpdatesSubject(name),
116 - serializer::encode));
117 return manager.registerMap(asyncMap); 113 return manager.registerMap(asyncMap);
118 } 114 }
119 } 115 }
...\ No newline at end of file ...\ No newline at end of file
......