Madan Jampani
Committed by Gerrit Code Review

ONOS-2133: Support for purging associated stores (ConsistentMap/DistributedSet) …

…when the application is uninstalled

Change-Id: I5bf7678f50ff3ed2792313383ff738c356bef69f
package org.onosproject.store.service;
import org.onosproject.core.ApplicationId;
/**
* Builder for consistent maps.
......@@ -24,6 +25,18 @@ public interface ConsistentMapBuilder<K, V> {
ConsistentMapBuilder<K, V> withName(String name);
/**
* Sets the owner applicationId for the map.
* <p>
* Note: If {@code purgeOnUninstall} option is enabled, applicationId
* must be specified.
* </p>
*
* @param id applicationId owning the consistent map
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id);
/**
* Sets a serializer that can be used to serialize
* both the keys and values inserted into the map. The serializer
* builder should be pre-populated with any classes that will be
......@@ -65,6 +78,18 @@ public interface ConsistentMapBuilder<K, V> {
ConsistentMapBuilder<K, V> withUpdatesDisabled();
/**
* Purges map contents when the application owning the map is uninstalled.
* <p>
* When this option is enabled, the caller must provide a applicationId via
* the {@code withAppliationId} builder method.
* <p>
* By default map entries will NOT be purged when owning application is uninstalled.
*
* @return this ConsistentMapBuilder
*/
ConsistentMapBuilder<K, V> withPurgeOnUninstall();
/**
* Builds an consistent map based on the configuration options
* supplied to this builder.
*
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.store.service;
import org.onosproject.core.ApplicationId;
/**
* Builder for distributed set.
*
......@@ -37,6 +39,18 @@ public interface DistributedSetBuilder<E> {
DistributedSetBuilder<E> withName(String name);
/**
* Sets the owner applicationId for the set.
* <p>
* Note: If {@code purgeOnUninstall} option is enabled, applicationId
* must be specified.
* </p>
*
* @param id applicationId owning the set
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withApplicationId(ApplicationId id);
/**
* Sets a serializer that can be used to serialize
* the elements add to the set. The serializer
* builder should be pre-populated with any classes that will be
......@@ -78,6 +92,18 @@ public interface DistributedSetBuilder<E> {
DistributedSetBuilder<E> withPartitionsDisabled();
/**
* Purges set contents when the application owning the set is uninstalled.
* <p>
* When this option is enabled, the caller must provide a applicationId via
* the {@code withAppliationId} builder method.
* <p>
* By default set contents will NOT be purged when owning application is uninstalled.
*
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withPurgeOnUninstall();
/**
* Builds an set based on the configuration options
* supplied to this builder.
*
......
......@@ -17,7 +17,10 @@
package org.onosproject.store.consistent.impl;
import com.google.common.base.Charsets;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -42,12 +45,17 @@ import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import static org.onlab.util.Tools.groupedThreads;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationListener;
import org.onosproject.app.ApplicationService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
......@@ -84,6 +92,8 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
/**
* Database manager.
......@@ -113,13 +123,18 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private ExecutorService eventDispatcher;
private ExecutorService queuePollExecutor;
private ApplicationListener appListener = new InternalApplicationListener();
private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
private final ListMultimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create();
private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
protected ApplicationService applicationService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -127,6 +142,16 @@ public class DatabaseManager implements StorageService, StorageAdminService {
return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
}
protected void bindApplicationService(ApplicationService service) {
applicationService = service;
applicationService.addListener(appListener);
}
protected void unbindApplicationService(ApplicationService service) {
applicationService.removeListener(appListener);
this.applicationService = null;
}
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
......@@ -250,6 +275,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
});
clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
maps.values().forEach(this::unregisterMap);
if (applicationService != null) {
applicationService.removeListener(appListener);
}
eventDispatcher.shutdown();
queuePollExecutor.shutdown();
log.info("Stopped");
......@@ -421,6 +449,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
// FIXME: We need to cleanly support different map instances with same name.
log.info("Map by name {} already exists", map.name());
return existing;
} else {
if (map.applicationId() != null) {
mapsByApplication.put(map.applicationId(), map);
}
}
clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
......@@ -434,6 +466,9 @@ public class DatabaseManager implements StorageService, StorageAdminService {
if (maps.remove(map.name()) != null) {
clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
}
if (map.applicationId() != null) {
mapsByApplication.remove(map.applicationId(), map);
}
}
protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
......@@ -446,4 +481,18 @@ public class DatabaseManager implements StorageService, StorageAdminService {
protected static MessageSubject mapUpdatesSubject(String mapName) {
return new MessageSubject(mapName + "-map-updates");
}
private class InternalApplicationListener implements ApplicationListener {
@Override
public void event(ApplicationEvent event) {
if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
ApplicationId appId = event.subject().id();
List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
if (event.type() == APP_UNINSTALLED) {
mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
}
}
}
}
}
\ No newline at end of file
......
......@@ -37,6 +37,7 @@ import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
import org.onlab.util.Tools;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
......@@ -59,9 +60,11 @@ import com.google.common.cache.LoadingCache;
public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
private final String name;
private final ApplicationId applicationId;
private final Database database;
private final Serializer serializer;
private final boolean readOnly;
private final boolean purgeOnUninstall;
private final Consumer<MapEvent<K, V>> eventPublisher;
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
......@@ -86,14 +89,18 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
}
public DefaultAsyncConsistentMap(String name,
ApplicationId applicationId,
Database database,
Serializer serializer,
boolean readOnly,
boolean purgeOnUninstall,
Consumer<MapEvent<K, V>> eventPublisher) {
this.name = checkNotNull(name, "map name cannot be null");
this.applicationId = applicationId;
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.readOnly = readOnly;
this.purgeOnUninstall = purgeOnUninstall;
this.eventPublisher = eventPublisher;
}
......@@ -113,6 +120,23 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
return serializer;
}
/**
* Returns the applicationId owning this map.
* @return application Id
*/
public ApplicationId applicationId() {
return applicationId;
}
/**
* Returns whether the map entries should be purged when the application
* owning it is uninstalled.
* @return true is map needs to cleared on app uninstall; false otherwise
*/
public boolean purgeOnUninstall() {
return purgeOnUninstall;
}
@Override
public CompletableFuture<Integer> size() {
return database.size(name);
......
......@@ -3,6 +3,7 @@ package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
......@@ -19,6 +20,8 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
private Serializer serializer;
private String name;
private ApplicationId applicationId;
private boolean purgeOnUninstall = false;
private boolean partitionsEnabled = true;
private boolean readOnly = false;
private final DatabaseManager manager;
......@@ -35,6 +38,19 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
}
@Override
public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) {
checkArgument(id != null);
this.applicationId = id;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
purgeOnUninstall = true;
return this;
}
@Override
public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
......@@ -53,8 +69,12 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
return this;
}
private boolean validInputs() {
return name != null && serializer != null;
private void validateInputs() {
checkState(name != null, "name must be specified");
checkState(serializer != null, "serializer must be specified");
if (purgeOnUninstall) {
checkState(applicationId != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
}
}
@Override
......@@ -68,12 +88,14 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
}
private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
checkState(validInputs());
validateInputs();
DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
name,
applicationId,
partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
serializer,
readOnly,
purgeOnUninstall,
event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
DatabaseManager.mapUpdatesSubject(name),
serializer::encode));
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.store.consistent.impl;
import org.onosproject.core.ApplicationId;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
......@@ -42,6 +43,18 @@ public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E>
}
@Override
public DistributedSetBuilder<E> withApplicationId(ApplicationId id) {
mapBuilder.withApplicationId(id);
return this;
}
@Override
public DistributedSetBuilder<E> withPurgeOnUninstall() {
mapBuilder.withPurgeOnUninstall();
return this;
}
@Override
public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
mapBuilder.withSerializer(serializer);
return this;
......