Madan Jampani
Committed by Gerrit Code Review

ONOS-2133: Support for purging associated stores (ConsistentMap/DistributedSet) …

…when the application is uninstalled

Change-Id: I5bf7678f50ff3ed2792313383ff738c356bef69f
1 package org.onosproject.store.service; 1 package org.onosproject.store.service;
2 2
3 +import org.onosproject.core.ApplicationId;
3 4
4 /** 5 /**
5 * Builder for consistent maps. 6 * Builder for consistent maps.
...@@ -24,6 +25,18 @@ public interface ConsistentMapBuilder<K, V> { ...@@ -24,6 +25,18 @@ public interface ConsistentMapBuilder<K, V> {
24 ConsistentMapBuilder<K, V> withName(String name); 25 ConsistentMapBuilder<K, V> withName(String name);
25 26
26 /** 27 /**
28 + * Sets the owner applicationId for the map.
29 + * <p>
30 + * Note: If {@code purgeOnUninstall} option is enabled, applicationId
31 + * must be specified.
32 + * </p>
33 + *
34 + * @param id applicationId owning the consistent map
35 + * @return this ConsistentMapBuilder
36 + */
37 + ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id);
38 +
39 + /**
27 * Sets a serializer that can be used to serialize 40 * Sets a serializer that can be used to serialize
28 * both the keys and values inserted into the map. The serializer 41 * both the keys and values inserted into the map. The serializer
29 * builder should be pre-populated with any classes that will be 42 * builder should be pre-populated with any classes that will be
...@@ -65,6 +78,18 @@ public interface ConsistentMapBuilder<K, V> { ...@@ -65,6 +78,18 @@ public interface ConsistentMapBuilder<K, V> {
65 ConsistentMapBuilder<K, V> withUpdatesDisabled(); 78 ConsistentMapBuilder<K, V> withUpdatesDisabled();
66 79
67 /** 80 /**
81 + * Purges map contents when the application owning the map is uninstalled.
82 + * <p>
83 + * When this option is enabled, the caller must provide a applicationId via
84 + * the {@code withAppliationId} builder method.
85 + * <p>
86 + * By default map entries will NOT be purged when owning application is uninstalled.
87 + *
88 + * @return this ConsistentMapBuilder
89 + */
90 + ConsistentMapBuilder<K, V> withPurgeOnUninstall();
91 +
92 + /**
68 * Builds an consistent map based on the configuration options 93 * Builds an consistent map based on the configuration options
69 * supplied to this builder. 94 * supplied to this builder.
70 * 95 *
......
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
15 */ 15 */
16 package org.onosproject.store.service; 16 package org.onosproject.store.service;
17 17
18 +import org.onosproject.core.ApplicationId;
19 +
18 /** 20 /**
19 * Builder for distributed set. 21 * Builder for distributed set.
20 * 22 *
...@@ -37,6 +39,18 @@ public interface DistributedSetBuilder<E> { ...@@ -37,6 +39,18 @@ public interface DistributedSetBuilder<E> {
37 DistributedSetBuilder<E> withName(String name); 39 DistributedSetBuilder<E> withName(String name);
38 40
39 /** 41 /**
42 + * Sets the owner applicationId for the set.
43 + * <p>
44 + * Note: If {@code purgeOnUninstall} option is enabled, applicationId
45 + * must be specified.
46 + * </p>
47 + *
48 + * @param id applicationId owning the set
49 + * @return this DistributedSetBuilder
50 + */
51 + DistributedSetBuilder<E> withApplicationId(ApplicationId id);
52 +
53 + /**
40 * Sets a serializer that can be used to serialize 54 * Sets a serializer that can be used to serialize
41 * the elements add to the set. The serializer 55 * the elements add to the set. The serializer
42 * builder should be pre-populated with any classes that will be 56 * builder should be pre-populated with any classes that will be
...@@ -78,6 +92,18 @@ public interface DistributedSetBuilder<E> { ...@@ -78,6 +92,18 @@ public interface DistributedSetBuilder<E> {
78 DistributedSetBuilder<E> withPartitionsDisabled(); 92 DistributedSetBuilder<E> withPartitionsDisabled();
79 93
80 /** 94 /**
95 + * Purges set contents when the application owning the set is uninstalled.
96 + * <p>
97 + * When this option is enabled, the caller must provide a applicationId via
98 + * the {@code withAppliationId} builder method.
99 + * <p>
100 + * By default set contents will NOT be purged when owning application is uninstalled.
101 + *
102 + * @return this DistributedSetBuilder
103 + */
104 + DistributedSetBuilder<E> withPurgeOnUninstall();
105 +
106 + /**
81 * Builds an set based on the configuration options 107 * Builds an set based on the configuration options
82 * supplied to this builder. 108 * supplied to this builder.
83 * 109 *
......
...@@ -17,7 +17,10 @@ ...@@ -17,7 +17,10 @@
17 package org.onosproject.store.consistent.impl; 17 package org.onosproject.store.consistent.impl;
18 18
19 import com.google.common.base.Charsets; 19 import com.google.common.base.Charsets;
20 +import com.google.common.collect.ArrayListMultimap;
21 +import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableSet; 22 import com.google.common.collect.ImmutableSet;
23 +import com.google.common.collect.ListMultimap;
21 import com.google.common.collect.Lists; 24 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps; 25 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets; 26 import com.google.common.collect.Sets;
...@@ -42,12 +45,17 @@ import org.apache.felix.scr.annotations.Component; ...@@ -42,12 +45,17 @@ import org.apache.felix.scr.annotations.Component;
42 import org.apache.felix.scr.annotations.Deactivate; 45 import org.apache.felix.scr.annotations.Deactivate;
43 import org.apache.felix.scr.annotations.Reference; 46 import org.apache.felix.scr.annotations.Reference;
44 import org.apache.felix.scr.annotations.ReferenceCardinality; 47 import org.apache.felix.scr.annotations.ReferenceCardinality;
48 +import org.apache.felix.scr.annotations.ReferencePolicy;
45 import org.apache.felix.scr.annotations.Service; 49 import org.apache.felix.scr.annotations.Service;
46 50
47 import static org.onlab.util.Tools.groupedThreads; 51 import static org.onlab.util.Tools.groupedThreads;
48 52
53 +import org.onosproject.app.ApplicationEvent;
54 +import org.onosproject.app.ApplicationListener;
55 +import org.onosproject.app.ApplicationService;
49 import org.onosproject.cluster.ClusterService; 56 import org.onosproject.cluster.ClusterService;
50 import org.onosproject.cluster.NodeId; 57 import org.onosproject.cluster.NodeId;
58 +import org.onosproject.core.ApplicationId;
51 import org.onosproject.core.IdGenerator; 59 import org.onosproject.core.IdGenerator;
52 import org.onosproject.store.cluster.impl.ClusterDefinitionManager; 60 import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
53 import org.onosproject.store.cluster.impl.NodeInfo; 61 import org.onosproject.store.cluster.impl.NodeInfo;
...@@ -84,6 +92,8 @@ import java.util.concurrent.TimeoutException; ...@@ -84,6 +92,8 @@ import java.util.concurrent.TimeoutException;
84 import java.util.stream.Collectors; 92 import java.util.stream.Collectors;
85 93
86 import static org.slf4j.LoggerFactory.getLogger; 94 import static org.slf4j.LoggerFactory.getLogger;
95 +import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
96 +import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
87 97
88 /** 98 /**
89 * Database manager. 99 * Database manager.
...@@ -113,13 +123,18 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -113,13 +123,18 @@ public class DatabaseManager implements StorageService, StorageAdminService {
113 123
114 private ExecutorService eventDispatcher; 124 private ExecutorService eventDispatcher;
115 private ExecutorService queuePollExecutor; 125 private ExecutorService queuePollExecutor;
126 + private ApplicationListener appListener = new InternalApplicationListener();
116 127
117 private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap(); 128 private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
129 + private final ListMultimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create();
118 private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap(); 130 private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
119 131
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 132 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
121 protected ClusterService clusterService; 133 protected ClusterService clusterService;
122 134
135 + @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
136 + protected ApplicationService applicationService;
137 +
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 138 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected ClusterCommunicationService clusterCommunicator; 139 protected ClusterCommunicationService clusterCommunicator;
125 140
...@@ -127,6 +142,16 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -127,6 +142,16 @@ public class DatabaseManager implements StorageService, StorageAdminService {
127 return String.format("onos://%s:%d", node.getIp(), node.getTcpPort()); 142 return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
128 } 143 }
129 144
145 + protected void bindApplicationService(ApplicationService service) {
146 + applicationService = service;
147 + applicationService.addListener(appListener);
148 + }
149 +
150 + protected void unbindApplicationService(ApplicationService service) {
151 + applicationService.removeListener(appListener);
152 + this.applicationService = null;
153 + }
154 +
130 @Activate 155 @Activate
131 public void activate() { 156 public void activate() {
132 localNodeId = clusterService.getLocalNode().id(); 157 localNodeId = clusterService.getLocalNode().id();
...@@ -250,6 +275,9 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -250,6 +275,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
250 }); 275 });
251 clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC); 276 clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
252 maps.values().forEach(this::unregisterMap); 277 maps.values().forEach(this::unregisterMap);
278 + if (applicationService != null) {
279 + applicationService.removeListener(appListener);
280 + }
253 eventDispatcher.shutdown(); 281 eventDispatcher.shutdown();
254 queuePollExecutor.shutdown(); 282 queuePollExecutor.shutdown();
255 log.info("Stopped"); 283 log.info("Stopped");
...@@ -421,6 +449,10 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -421,6 +449,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
421 // FIXME: We need to cleanly support different map instances with same name. 449 // FIXME: We need to cleanly support different map instances with same name.
422 log.info("Map by name {} already exists", map.name()); 450 log.info("Map by name {} already exists", map.name());
423 return existing; 451 return existing;
452 + } else {
453 + if (map.applicationId() != null) {
454 + mapsByApplication.put(map.applicationId(), map);
455 + }
424 } 456 }
425 457
426 clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()), 458 clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
...@@ -434,6 +466,9 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -434,6 +466,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
434 if (maps.remove(map.name()) != null) { 466 if (maps.remove(map.name()) != null) {
435 clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())); 467 clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
436 } 468 }
469 + if (map.applicationId() != null) {
470 + mapsByApplication.remove(map.applicationId(), map);
471 + }
437 } 472 }
438 473
439 protected <E> void registerQueue(DefaultDistributedQueue<E> queue) { 474 protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
...@@ -446,4 +481,18 @@ public class DatabaseManager implements StorageService, StorageAdminService { ...@@ -446,4 +481,18 @@ public class DatabaseManager implements StorageService, StorageAdminService {
446 protected static MessageSubject mapUpdatesSubject(String mapName) { 481 protected static MessageSubject mapUpdatesSubject(String mapName) {
447 return new MessageSubject(mapName + "-map-updates"); 482 return new MessageSubject(mapName + "-map-updates");
448 } 483 }
484 +
485 + private class InternalApplicationListener implements ApplicationListener {
486 + @Override
487 + public void event(ApplicationEvent event) {
488 + if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
489 + ApplicationId appId = event.subject().id();
490 + List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
491 + mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
492 + if (event.type() == APP_UNINSTALLED) {
493 + mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
494 + }
495 + }
496 + }
497 + }
449 } 498 }
...\ No newline at end of file ...\ No newline at end of file
......
...@@ -37,6 +37,7 @@ import java.util.Set; ...@@ -37,6 +37,7 @@ import java.util.Set;
37 import org.apache.commons.lang3.tuple.Pair; 37 import org.apache.commons.lang3.tuple.Pair;
38 import org.onlab.util.HexString; 38 import org.onlab.util.HexString;
39 import org.onlab.util.Tools; 39 import org.onlab.util.Tools;
40 +import org.onosproject.core.ApplicationId;
40 import org.onosproject.store.service.AsyncConsistentMap; 41 import org.onosproject.store.service.AsyncConsistentMap;
41 import org.onosproject.store.service.ConsistentMapException; 42 import org.onosproject.store.service.ConsistentMapException;
42 import org.onosproject.store.service.MapEvent; 43 import org.onosproject.store.service.MapEvent;
...@@ -59,9 +60,11 @@ import com.google.common.cache.LoadingCache; ...@@ -59,9 +60,11 @@ import com.google.common.cache.LoadingCache;
59 public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> { 60 public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
60 61
61 private final String name; 62 private final String name;
63 + private final ApplicationId applicationId;
62 private final Database database; 64 private final Database database;
63 private final Serializer serializer; 65 private final Serializer serializer;
64 private final boolean readOnly; 66 private final boolean readOnly;
67 + private final boolean purgeOnUninstall;
65 private final Consumer<MapEvent<K, V>> eventPublisher; 68 private final Consumer<MapEvent<K, V>> eventPublisher;
66 69
67 private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>(); 70 private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
...@@ -86,14 +89,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -86,14 +89,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
86 } 89 }
87 90
88 public DefaultAsyncConsistentMap(String name, 91 public DefaultAsyncConsistentMap(String name,
92 + ApplicationId applicationId,
89 Database database, 93 Database database,
90 Serializer serializer, 94 Serializer serializer,
91 boolean readOnly, 95 boolean readOnly,
96 + boolean purgeOnUninstall,
92 Consumer<MapEvent<K, V>> eventPublisher) { 97 Consumer<MapEvent<K, V>> eventPublisher) {
93 this.name = checkNotNull(name, "map name cannot be null"); 98 this.name = checkNotNull(name, "map name cannot be null");
99 + this.applicationId = applicationId;
94 this.database = checkNotNull(database, "database cannot be null"); 100 this.database = checkNotNull(database, "database cannot be null");
95 this.serializer = checkNotNull(serializer, "serializer cannot be null"); 101 this.serializer = checkNotNull(serializer, "serializer cannot be null");
96 this.readOnly = readOnly; 102 this.readOnly = readOnly;
103 + this.purgeOnUninstall = purgeOnUninstall;
97 this.eventPublisher = eventPublisher; 104 this.eventPublisher = eventPublisher;
98 } 105 }
99 106
...@@ -113,6 +120,23 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> ...@@ -113,6 +120,23 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
113 return serializer; 120 return serializer;
114 } 121 }
115 122
123 + /**
124 + * Returns the applicationId owning this map.
125 + * @return application Id
126 + */
127 + public ApplicationId applicationId() {
128 + return applicationId;
129 + }
130 +
131 + /**
132 + * Returns whether the map entries should be purged when the application
133 + * owning it is uninstalled.
134 + * @return true is map needs to cleared on app uninstall; false otherwise
135 + */
136 + public boolean purgeOnUninstall() {
137 + return purgeOnUninstall;
138 + }
139 +
116 @Override 140 @Override
117 public CompletableFuture<Integer> size() { 141 public CompletableFuture<Integer> size() {
118 return database.size(name); 142 return database.size(name);
......
...@@ -3,6 +3,7 @@ package org.onosproject.store.consistent.impl; ...@@ -3,6 +3,7 @@ package org.onosproject.store.consistent.impl;
3 import static com.google.common.base.Preconditions.checkArgument; 3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState; 4 import static com.google.common.base.Preconditions.checkState;
5 5
6 +import org.onosproject.core.ApplicationId;
6 import org.onosproject.store.service.AsyncConsistentMap; 7 import org.onosproject.store.service.AsyncConsistentMap;
7 import org.onosproject.store.service.ConsistentMap; 8 import org.onosproject.store.service.ConsistentMap;
8 import org.onosproject.store.service.ConsistentMapBuilder; 9 import org.onosproject.store.service.ConsistentMapBuilder;
...@@ -19,6 +20,8 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -19,6 +20,8 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
19 20
20 private Serializer serializer; 21 private Serializer serializer;
21 private String name; 22 private String name;
23 + private ApplicationId applicationId;
24 + private boolean purgeOnUninstall = false;
22 private boolean partitionsEnabled = true; 25 private boolean partitionsEnabled = true;
23 private boolean readOnly = false; 26 private boolean readOnly = false;
24 private final DatabaseManager manager; 27 private final DatabaseManager manager;
...@@ -35,6 +38,19 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -35,6 +38,19 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
35 } 38 }
36 39
37 @Override 40 @Override
41 + public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) {
42 + checkArgument(id != null);
43 + this.applicationId = id;
44 + return this;
45 + }
46 +
47 + @Override
48 + public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
49 + purgeOnUninstall = true;
50 + return this;
51 + }
52 +
53 + @Override
38 public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) { 54 public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
39 checkArgument(serializer != null); 55 checkArgument(serializer != null);
40 this.serializer = serializer; 56 this.serializer = serializer;
...@@ -53,8 +69,12 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -53,8 +69,12 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
53 return this; 69 return this;
54 } 70 }
55 71
56 - private boolean validInputs() { 72 + private void validateInputs() {
57 - return name != null && serializer != null; 73 + checkState(name != null, "name must be specified");
74 + checkState(serializer != null, "serializer must be specified");
75 + if (purgeOnUninstall) {
76 + checkState(applicationId != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
77 + }
58 } 78 }
59 79
60 @Override 80 @Override
...@@ -68,12 +88,14 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K ...@@ -68,12 +88,14 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
68 } 88 }
69 89
70 private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() { 90 private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
71 - checkState(validInputs()); 91 + validateInputs();
72 DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>( 92 DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
73 name, 93 name,
94 + applicationId,
74 partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase, 95 partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
75 serializer, 96 serializer,
76 readOnly, 97 readOnly,
98 + purgeOnUninstall,
77 event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event, 99 event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
78 DatabaseManager.mapUpdatesSubject(name), 100 DatabaseManager.mapUpdatesSubject(name),
79 serializer::encode)); 101 serializer::encode));
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.store.consistent.impl; 16 package org.onosproject.store.consistent.impl;
17 17
18 +import org.onosproject.core.ApplicationId;
18 import org.onosproject.store.service.ConsistentMapBuilder; 19 import org.onosproject.store.service.ConsistentMapBuilder;
19 import org.onosproject.store.service.DistributedSet; 20 import org.onosproject.store.service.DistributedSet;
20 import org.onosproject.store.service.Serializer; 21 import org.onosproject.store.service.Serializer;
...@@ -42,6 +43,18 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> ...@@ -42,6 +43,18 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E>
42 } 43 }
43 44
44 @Override 45 @Override
46 + public DistributedSetBuilder<E> withApplicationId(ApplicationId id) {
47 + mapBuilder.withApplicationId(id);
48 + return this;
49 + }
50 +
51 + @Override
52 + public DistributedSetBuilder<E> withPurgeOnUninstall() {
53 + mapBuilder.withPurgeOnUninstall();
54 + return this;
55 + }
56 +
57 + @Override
45 public DistributedSetBuilder<E> withSerializer(Serializer serializer) { 58 public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
46 mapBuilder.withSerializer(serializer); 59 mapBuilder.withSerializer(serializer);
47 return this; 60 return this;
......