Thomas Vachuska
Committed by Gerrit Code Review

Adding persistence to the gossip application store.

Change-Id: Ib1382f9d1009297dde902f0d3e0daf27596587c5
......@@ -121,6 +121,17 @@ public class ApplicationArchive
}
/**
* Returns the timestamp in millis since start of epoch, of when the
* specified application was last modified or changed state.
*
* @param appName application name
* @return number of millis since start of epoch
*/
public long getUpdateTime(String appName) {
return appFile(appName, APP_XML).lastModified();
}
/**
* Loads the application descriptor from the specified application archive
* stream and saves the stream in the appropriate application archive
* directory.
......@@ -313,7 +324,7 @@ public class ApplicationArchive
*/
protected boolean setActive(String appName) {
try {
return appFile(appName, "active").createNewFile();
return appFile(appName, "active").createNewFile() && updateTime(appName);
} catch (IOException e) {
throw new ApplicationException("Unable to mark app as active", e);
}
......@@ -326,7 +337,17 @@ public class ApplicationArchive
* @return true if file was deleted
*/
protected boolean clearActive(String appName) {
return appFile(appName, "active").delete();
return appFile(appName, "active").delete() && updateTime(appName);
}
/**
* Updates the time-stamp of the app descriptor file.
*
* @param appName application name
* @return true if the app descriptor was updated
*/
private boolean updateTime(String appName) {
return appFile(appName, APP_XML).setLastModified(System.currentTimeMillis());
}
/**
......
......@@ -18,7 +18,6 @@ package org.onosproject.store.app;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
......@@ -31,6 +30,7 @@ import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationException;
import org.onosproject.app.ApplicationState;
import org.onosproject.app.ApplicationStore;
import org.onosproject.app.ApplicationStoreDelegate;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.common.app.ApplicationArchive;
......@@ -47,6 +47,8 @@ import org.onosproject.store.ecmap.EventuallyConsistentMap;
import org.onosproject.store.ecmap.EventuallyConsistentMapEvent;
import org.onosproject.store.ecmap.EventuallyConsistentMapImpl;
import org.onosproject.store.ecmap.EventuallyConsistentMapListener;
import org.onosproject.store.impl.ClockService;
import org.onosproject.store.impl.MultiValuedTimestamp;
import org.onosproject.store.impl.WallclockClockManager;
import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
......@@ -59,6 +61,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
......@@ -105,10 +108,13 @@ public class GossipApplicationStore extends ApplicationArchive
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
private final AtomicLong sequence = new AtomicLong();
@Activate
public void activate() {
KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MultiValuedTimestamp.class)
.register(InternalState.class);
executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store"));
......@@ -118,35 +124,48 @@ public class GossipApplicationStore extends ApplicationArchive
clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer(), messageHandlingExecutor);
// FIXME: Consider consolidating into a single map.
ClockService<ApplicationId, Application> appsClockService = (appId, app) ->
new MultiValuedTimestamp<>(getUpdateTime(appId.name()),
sequence.incrementAndGet());
apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
serializer,
appsClockService);
ClockService<Application, InternalState> statesClockService = (app, state) ->
new MultiValuedTimestamp<>(getUpdateTime(app.id().name()),
sequence.incrementAndGet());
states = new EventuallyConsistentMapImpl<>("app-states",
clusterService,
clusterCommunicator,
intentSerializer,
new WallclockClockManager<>());
serializer,
statesClockService);
states.addListener(new InternalAppStatesListener());
permissions = new EventuallyConsistentMapImpl<>("app-permissions",
clusterService,
clusterCommunicator,
intentSerializer,
serializer,
new WallclockClockManager<>());
// FIXME: figure out load from disk; this will require resolving the dual authority problem
executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
log.info("Started");
}
/**
* Loads the application inventory from the disk and activates apps if
* they are marked to be active.
*/
private void loadFromDisk() {
for (String name : getApplicationNames()) {
create(getApplicationDescription(name));
// load app permissions
Application app = create(getApplicationDescription(name));
if (app != null && isActive(app.id().name())) {
activate(app.id());
// load app permissions
}
}
}
......@@ -162,6 +181,13 @@ public class GossipApplicationStore extends ApplicationArchive
}
@Override
public void setDelegate(ApplicationStoreDelegate delegate) {
super.setDelegate(delegate);
loadFromDisk();
// executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
}
@Override
public Set<Application> getApplications() {
return ImmutableSet.copyOf(apps.values());
}
......@@ -403,5 +429,6 @@ public class GossipApplicationStore extends ApplicationArchive
appDesc.origin(), appDesc.permissions(),
appDesc.featuresRepo(), appDesc.features());
}
}
......