Madan Jampani
Committed by Gerrit Code Review

New ApplicationStore that uses a single ConsistentMap to track all app related state

Change-Id: Ieacc97f213add8ece8f462cd9971fb6ef3d0dde5
(cherry picked from commit 6c02d9e1)
......@@ -96,7 +96,7 @@ public class Versioned<V> {
* @param <U> value type of the returned instance
* @return mapped instance
*/
public <U> Versioned<U> map(Function<V, U> transformer) {
public synchronized <U> Versioned<U> map(Function<V, U> transformer) {
return new Versioned<>(transformer.apply(value), version, creationTime);
}
......
/*
* Copyright 2015-present Open Networking Laboratory
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,18 +16,20 @@
package org.onosproject.store.app;
import com.google.common.base.Charsets;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.app.ApplicationDescription;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationException;
......@@ -46,13 +48,14 @@ import org.onosproject.security.Permission;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
......@@ -66,7 +69,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
......@@ -76,18 +82,16 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.randomDelay;
import static org.onosproject.app.ApplicationEvent.Type.*;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.onosproject.store.app.DistributedApplicationStore.InternalState.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of applications in a distributed data store that uses
* optimistic replication and gossip based anti-entropy techniques.
* Manages inventory of applications in a distributed data store providing
* stronger consistency guarantees.
*/
@Component(immediate = true)
@Component(immediate = true, enabled = true)
@Service
public class GossipApplicationStore extends ApplicationArchive
public class DistributedApplicationStore extends ApplicationArchive
implements ApplicationStore {
private final Logger log = getLogger(getClass());
......@@ -110,9 +114,7 @@ public class GossipApplicationStore extends ApplicationArchive
private ScheduledExecutorService executor;
private ExecutorService messageHandlingExecutor;
private EventuallyConsistentMap<ApplicationId, Application> apps;
private EventuallyConsistentMap<Application, InternalState> states;
private EventuallyConsistentMap<Application, Set<Permission>> permissions;
private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
......@@ -124,11 +126,12 @@ public class GossipApplicationStore extends ApplicationArchive
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
private final InternalAppsListener appsListener = new InternalAppsListener();
private Consumer<Status> statusChangeListener;
// Multimap to track which apps are required by others apps
// app -> { required-by, ... }
// Apps explicitly activated will be required by the CORE app
......@@ -139,16 +142,8 @@ public class GossipApplicationStore extends ApplicationArchive
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MultiValuedTimestamp.class)
.register(InternalState.class);
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler", log));
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
......@@ -161,33 +156,36 @@ public class GossipApplicationStore extends ApplicationArchive
Function.identity(),
messageHandlingExecutor);
// FIXME: Consider consolidating into a single map.
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
states.addListener(new InternalAppStatesListener());
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
.withName("onos-apps")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API,
InternalApplicationHolder.class,
InternalState.class))
.build();
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
statusChangeListener = status -> {
if (status == Status.ACTIVE) {
executor.execute(this::bootstrapExistingApplications);
}
};
apps.addListener(appsListener, messageHandlingExecutor);
apps.addStatusChangeListener(statusChangeListener);
coreAppId = getId(CoreService.CORE_APP_NAME);
log.info("Started");
}
/**
* Processes existing applications from the distributed map. This is done to
* account for events that this instance may be have missed due to a staggered start.
*/
void bootstrapExistingApplications() {
apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
}
/**
* Loads the application inventory from the disk and activates apps if
* they are marked to be active.
*/
......@@ -244,23 +242,27 @@ public class GossipApplicationStore extends ApplicationArchive
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
apps.removeStatusChangeListener(statusChangeListener);
apps.removeListener(appsListener);
messageHandlingExecutor.shutdown();
executor.shutdown();
apps.destroy();
states.destroy();
permissions.destroy();
log.info("Stopped");
}
@Override
public void setDelegate(ApplicationStoreDelegate delegate) {
super.setDelegate(delegate);
executor.execute(this::bootstrapExistingApplications);
executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
}
@Override
public Set<Application> getApplications() {
return ImmutableSet.copyOf(apps.values());
return ImmutableSet.copyOf(apps.values()
.stream()
.map(Versioned::value)
.map(InternalApplicationHolder::app)
.collect(Collectors.toSet()));
}
@Override
......@@ -270,15 +272,15 @@ public class GossipApplicationStore extends ApplicationArchive
@Override
public Application getApplication(ApplicationId appId) {
return apps.get(appId);
InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
return appHolder != null ? appHolder.app() : null;
}
@Override
public ApplicationState getState(ApplicationId appId) {
Application app = apps.get(appId);
InternalState s = app == null ? null : states.get(app);
return s == null ? null : s == ACTIVATED ?
ApplicationState.ACTIVE : ApplicationState.INSTALLED;
InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
InternalState state = appHolder != null ? appHolder.state() : null;
return state == null ? null : state == ACTIVATED ? ApplicationState.ACTIVE : ApplicationState.INSTALLED;
}
@Override
......@@ -300,26 +302,21 @@ public class GossipApplicationStore extends ApplicationArchive
if (updateTime) {
updateTime(app.id().name());
}
apps.put(app.id(), app);
states.put(app, INSTALLED);
return app;
InternalApplicationHolder previousApp =
Versioned.valueOrNull(apps.putIfAbsent(app.id(), new InternalApplicationHolder(app, INSTALLED, null)));
return previousApp != null ? previousApp.app() : app;
}
@Override
public void remove(ApplicationId appId) {
Application app = apps.get(appId);
if (app != null) {
uninstallDependentApps(app);
apps.remove(appId);
states.remove(app);
permissions.remove(app);
}
uninstallDependentApps(appId);
apps.remove(appId);
}
// Uninstalls all apps that depend on the given app.
private void uninstallDependentApps(Application app) {
private void uninstallDependentApps(ApplicationId appId) {
getApplications().stream()
.filter(a -> a.requiredApps().contains(app.id().name()))
.filter(a -> a.requiredApps().contains(appId.name()))
.forEach(a -> remove(a.id()));
}
......@@ -335,13 +332,18 @@ public class GossipApplicationStore extends ApplicationArchive
private void activate(ApplicationId appId, boolean updateTime) {
Application app = apps.get(appId);
if (app != null) {
AtomicBoolean stateChanged = new AtomicBoolean(false);
InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.computeIf(appId,
v -> v != null && v.state() != ACTIVATED,
(k, v) -> {
stateChanged.set(true);
return new InternalApplicationHolder(v.app(), ACTIVATED, v.permissions());
}));
if (stateChanged.get()) {
if (updateTime) {
updateTime(appId.name());
}
activateRequiredApps(app);
states.put(app, ACTIVATED);
activateRequiredApps(appHolder.app());
}
}
......@@ -352,90 +354,108 @@ public class GossipApplicationStore extends ApplicationArchive
@Override
public void deactivate(ApplicationId appId) {
deactivateDependentApps(getApplication(appId));
deactivateDependentApps(appId);
deactivate(appId, coreAppId);
}
private void deactivate(ApplicationId appId, ApplicationId forAppId) {
requiredBy.remove(appId, forAppId);
if (requiredBy.get(appId).isEmpty()) {
Application app = apps.get(appId);
if (app != null) {
AtomicBoolean stateChanged = new AtomicBoolean(false);
apps.computeIf(appId,
v -> v != null && v.state() != DEACTIVATED,
(k, v) -> {
stateChanged.set(true);
return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
});
if (stateChanged.get()) {
updateTime(appId.name());
states.put(app, DEACTIVATED);
deactivateRequiredApps(app);
deactivateRequiredApps(appId);
}
}
}
// Deactivates all apps that require this application.
private void deactivateDependentApps(Application app) {
getApplications().stream()
.filter(a -> states.get(a) == ACTIVATED)
.filter(a -> a.requiredApps().contains(app.id().name()))
.forEach(a -> deactivate(a.id()));
private void deactivateDependentApps(ApplicationId appId) {
apps.values()
.stream()
.map(Versioned::value)
.filter(a -> a.state() == ACTIVATED)
.filter(a -> a.app().requiredApps().contains(appId.name()))
.forEach(a -> deactivate(a.app().id()));
}
// Deactivates all apps required by this application.
private void deactivateRequiredApps(Application app) {
app.requiredApps().stream().map(this::getId).map(this::getApplication)
.filter(a -> states.get(a) == ACTIVATED)
.forEach(a -> deactivate(a.id(), app.id()));
private void deactivateRequiredApps(ApplicationId appId) {
getApplication(appId).requiredApps()
.stream()
.map(this::getId)
.map(apps::get)
.map(Versioned::value)
.filter(a -> a.state() == ACTIVATED)
.forEach(a -> deactivate(a.app().id(), appId));
}
@Override
public Set<Permission> getPermissions(ApplicationId appId) {
Application app = apps.get(appId);
return app != null ? permissions.get(app) : null;
InternalApplicationHolder app = Versioned.valueOrNull(apps.get(appId));
return app != null ? ImmutableSet.copyOf(app.permissions()) : ImmutableSet.of();
}
@Override
public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
Application app = getApplication(appId);
if (app != null) {
this.permissions.put(app, permissions);
delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
AtomicBoolean permissionsChanged = new AtomicBoolean(false);
Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
(k, v) -> {
permissionsChanged.set(true);
return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
});
if (permissionsChanged.get()) {
delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
}
}
/**
* Listener to application state distributed map changes.
*/
private final class InternalAppStatesListener
implements EventuallyConsistentMapListener<Application, InternalState> {
private final class InternalAppsListener
implements MapEventListener<ApplicationId, InternalApplicationHolder> {
@Override
public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
// If we do not have a delegate, refuse to process any events entirely.
// This is to allow the anti-entropy to kick in and process the events
// perhaps a bit later, but with opportunity to notify delegate.
public void event(MapEvent<ApplicationId, InternalApplicationHolder> event) {
if (delegate == null) {
return;
}
Application app = event.key();
InternalState state = event.value();
if (event.type() == PUT) {
if (state == INSTALLED) {
fetchBitsIfNeeded(app);
delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
} else if (state == ACTIVATED) {
installAppIfNeeded(app);
setActive(app.id().name());
delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
} else if (state == DEACTIVATED) {
clearActive(app.id().name());
delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
ApplicationId appId = event.key();
InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
return;
}
} else if (event.type() == REMOVE) {
delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
purgeApplication(app.id().name());
setupApplicationAndNotify(appId, newApp.app(), newApp.state());
} else if (event.type() == MapEvent.Type.REMOVE) {
delegate.notify(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
purgeApplication(appId.name());
}
}
}
private void setupApplicationAndNotify(ApplicationId appId, Application app, InternalState state) {
if (state == INSTALLED) {
fetchBitsIfNeeded(app);
delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
} else if (state == ACTIVATED) {
installAppIfNeeded(app);
setActive(appId.name());
delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
} else if (state == DEACTIVATED) {
clearActive(appId.name());
delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
}
}
/**
* Determines if the application bits are available locally.
*/
......@@ -512,19 +532,6 @@ public class GossipApplicationStore extends ApplicationArchive
}
/**
* Prunes applications which are not in the map, but are on disk.
*/
private void pruneUninstalledApps() {
for (String name : getApplicationNames()) {
if (getApplication(getId(name)) == null) {
Application app = registerApp(getApplicationDescription(name));
delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
purgeApplication(app.id().name());
}
}
}
/**
* Produces a registered application from the supplied description.
*/
private Application registerApp(ApplicationDescription appDesc) {
......@@ -544,4 +551,46 @@ public class GossipApplicationStore extends ApplicationArchive
appDesc.features(),
appDesc.requiredApps());
}
/**
* Internal class for holding app information.
*/
private static class InternalApplicationHolder {
private final Application app;
private final InternalState state;
private final Set<Permission> permissions;
@SuppressWarnings("unused")
private InternalApplicationHolder() {
app = null;
state = null;
permissions = null;
}
public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
this.app = Preconditions.checkNotNull(app);
this.state = state;
this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
}
public Application app() {
return app;
}
public InternalState state() {
return state;
}
public Set<Permission> permissions() {
return permissions;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("app", app.id())
.add("state", state)
.toString();
}
}
}
......
......@@ -15,6 +15,6 @@
*/
/**
* Implementation of distributed applications store.
* Implementation of distributed application store.
*/
package org.onosproject.store.app;
......
......@@ -38,7 +38,6 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
......@@ -109,8 +108,14 @@ public class DistributedApplicationIdStore implements ApplicationIdStore {
@Override
public ApplicationId registerApplication(String name) {
return Versioned.valueOrNull(registeredIds.computeIfAbsent(name,
key -> new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name)));
ApplicationId exisitingAppId = registeredIds.asJavaMap().get(name);
if (exisitingAppId == null) {
ApplicationId newAppId = new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name);
exisitingAppId = registeredIds.asJavaMap().putIfAbsent(name, newAppId);
return exisitingAppId == null ? newAppId : exisitingAppId;
} else {
return exisitingAppId;
}
}
private void primeIdToAppIdCache() {
......
......@@ -54,7 +54,6 @@ import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.app.GossipApplicationStore.InternalState;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
......@@ -121,8 +120,7 @@ public class DistributedTunnelStore
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MultiValuedTimestamp.class)
.register(InternalState.class);
.register(MultiValuedTimestamp.class);
tunnelIdAsKeyStore = storageService
.<TunnelId, Tunnel>eventuallyConsistentMapBuilder()
.withName("all_tunnel").withSerializer(serializer)
......