Thomas Vachuska
Committed by Gerrit Code Review

Added a temporary fix for out-of-order app activation event delivery.

Cleaning up app subsystem code a tiny bit as well.

Change-Id: I5df7d4c6d62d122653331474fb079648e779d595
......@@ -80,8 +80,6 @@ public class ApplicationManager
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected FeaturesService featuresService;
private boolean initializing;
// Application supplied hooks for pre-activation processing.
private final Multimap<String, Runnable> deactivateHooks = HashMultimap.create();
private final Cache<ApplicationId, CountDownLatch> pendingOperations =
......@@ -92,11 +90,7 @@ public class ApplicationManager
@Activate
public void activate() {
eventDispatcher.addSink(ApplicationEvent.class, listenerRegistry);
initializing = true;
store.setDelegate(delegate);
initializing = false;
log.info("Started");
}
......
......@@ -30,6 +30,7 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
import org.onosproject.app.ApplicationDescription;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationException;
......@@ -89,11 +90,14 @@ import static org.slf4j.LoggerFactory.getLogger;
* Manages inventory of applications in a distributed data store providing
* stronger consistency guarantees.
*/
@Component(immediate = true, enabled = true)
@Component(immediate = true)
@Service
public class DistributedApplicationStore extends ApplicationArchive
implements ApplicationStore {
// FIXME: eliminate the need for this
private static final int FIXME_ACTIVATION_DELAY = 500;
private final Logger log = getLogger(getClass());
private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
......@@ -144,24 +148,24 @@ public class DistributedApplicationStore extends ApplicationArchive
public void activate() {
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler", log));
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
try {
return toByteArray(getApplicationInputStream(name));
} catch (IOException e) {
throw new StorageException(e);
}
},
Function.identity(),
messageHandlingExecutor);
clusterCommunicator.addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
try {
return toByteArray(getApplicationInputStream(name));
} catch (IOException e) {
throw new StorageException(e);
}
},
Function.identity(),
messageHandlingExecutor);
apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
.withName("onos-apps")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API,
InternalApplicationHolder.class,
InternalState.class))
InternalApplicationHolder.class,
InternalState.class))
.build();
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
......@@ -180,7 +184,7 @@ public class DistributedApplicationStore extends ApplicationArchive
* Processes existing applications from the distributed map. This is done to
* account for events that this instance may be have missed due to a staggered start.
*/
void bootstrapExistingApplications() {
private void bootstrapExistingApplications() {
apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
}
......@@ -253,7 +257,7 @@ public class DistributedApplicationStore extends ApplicationArchive
public void setDelegate(ApplicationStoreDelegate delegate) {
super.setDelegate(delegate);
executor.execute(this::bootstrapExistingApplications);
executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
executor.schedule((Runnable) this::loadFromDisk, APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
}
@Override
......@@ -296,7 +300,7 @@ public class DistributedApplicationStore extends ApplicationArchive
}
private boolean hasPrerequisites(ApplicationDescription app) {
return !app.requiredApps().stream().map(n -> getId(n))
return !app.requiredApps().stream().map(this::getId)
.anyMatch(id -> id == null || getApplication(id) == null);
}
......@@ -342,6 +346,11 @@ public class DistributedApplicationStore extends ApplicationArchive
}
activateRequiredApps(vAppHolder.value().app());
// FIXME: Take a breath before the post-order operation to allow required app
// activation events to fully propagate. There appears to be an out-of-order
// event delivery issue that needs to be fixed.
Tools.delay(FIXME_ACTIVATION_DELAY);
apps.computeIf(appId, v -> v != null && v.state() != ACTIVATED,
(k, v) -> new InternalApplicationHolder(
v.app(), ACTIVATED, v.permissions()));
......@@ -539,25 +548,25 @@ public class DistributedApplicationStore extends ApplicationArchive
private Application registerApp(ApplicationDescription appDesc) {
ApplicationId appId = idStore.registerApplication(appDesc.name());
return new DefaultApplication(appId,
appDesc.version(),
appDesc.title(),
appDesc.description(),
appDesc.origin(),
appDesc.category(),
appDesc.url(),
appDesc.readme(),
appDesc.icon(),
appDesc.role(),
appDesc.permissions(),
appDesc.featuresRepo(),
appDesc.features(),
appDesc.requiredApps());
appDesc.version(),
appDesc.title(),
appDesc.description(),
appDesc.origin(),
appDesc.category(),
appDesc.url(),
appDesc.readme(),
appDesc.icon(),
appDesc.role(),
appDesc.permissions(),
appDesc.featuresRepo(),
appDesc.features(),
appDesc.requiredApps());
}
/**
* Internal class for holding app information.
*/
private static class InternalApplicationHolder {
private static final class InternalApplicationHolder {
private final Application app;
private final InternalState state;
private final Set<Permission> permissions;
......@@ -569,7 +578,7 @@ public class DistributedApplicationStore extends ApplicationArchive
permissions = null;
}
public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
private InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
this.app = Preconditions.checkNotNull(app);
this.state = state;
this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
......