Madan Jampani

ApplicationStore to use Topic instead of AtomicValue for app activation notifications

Change-Id: I25cf6d1744969d0b0dfd0557ec1dd163ad3148d0
...@@ -48,15 +48,13 @@ import org.onosproject.security.Permission; ...@@ -48,15 +48,13 @@ import org.onosproject.security.Permission;
48 import org.onosproject.store.cluster.messaging.ClusterCommunicationService; 48 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
49 import org.onosproject.store.cluster.messaging.MessageSubject; 49 import org.onosproject.store.cluster.messaging.MessageSubject;
50 import org.onosproject.store.serializers.KryoNamespaces; 50 import org.onosproject.store.serializers.KryoNamespaces;
51 -import org.onosproject.store.service.AtomicValue;
52 -import org.onosproject.store.service.AtomicValueEvent;
53 -import org.onosproject.store.service.AtomicValueEventListener;
54 import org.onosproject.store.service.ConsistentMap; 51 import org.onosproject.store.service.ConsistentMap;
55 import org.onosproject.store.service.MapEvent; 52 import org.onosproject.store.service.MapEvent;
56 import org.onosproject.store.service.MapEventListener; 53 import org.onosproject.store.service.MapEventListener;
57 import org.onosproject.store.service.Serializer; 54 import org.onosproject.store.service.Serializer;
58 import org.onosproject.store.service.StorageException; 55 import org.onosproject.store.service.StorageException;
59 import org.onosproject.store.service.StorageService; 56 import org.onosproject.store.service.StorageService;
57 +import org.onosproject.store.service.Topic;
60 import org.onosproject.store.service.Versioned; 58 import org.onosproject.store.service.Versioned;
61 import org.onosproject.store.service.DistributedPrimitive.Status; 59 import org.onosproject.store.service.DistributedPrimitive.Status;
62 import org.slf4j.Logger; 60 import org.slf4j.Logger;
...@@ -121,7 +119,7 @@ public class DistributedApplicationStore extends ApplicationArchive ...@@ -121,7 +119,7 @@ public class DistributedApplicationStore extends ApplicationArchive
121 private ExecutorService messageHandlingExecutor; 119 private ExecutorService messageHandlingExecutor;
122 120
123 private ConsistentMap<ApplicationId, InternalApplicationHolder> apps; 121 private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
124 - private AtomicValue<Application> nextAppToActivate; 122 + private Topic<Application> appActivationTopic;
125 123
126 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) 124 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
127 protected ClusterCommunicationService clusterCommunicator; 125 protected ClusterCommunicationService clusterCommunicator;
...@@ -136,7 +134,7 @@ public class DistributedApplicationStore extends ApplicationArchive ...@@ -136,7 +134,7 @@ public class DistributedApplicationStore extends ApplicationArchive
136 protected ApplicationIdStore idStore; 134 protected ApplicationIdStore idStore;
137 135
138 private final InternalAppsListener appsListener = new InternalAppsListener(); 136 private final InternalAppsListener appsListener = new InternalAppsListener();
139 - private final NextAppToActivateValueListener nextAppToActivateListener = new NextAppToActivateValueListener(); 137 + private final Consumer<Application> appActivator = new AppActivator();
140 138
141 private Consumer<Status> statusChangeListener; 139 private Consumer<Status> statusChangeListener;
142 140
...@@ -172,13 +170,10 @@ public class DistributedApplicationStore extends ApplicationArchive ...@@ -172,13 +170,10 @@ public class DistributedApplicationStore extends ApplicationArchive
172 InternalState.class)) 170 InternalState.class))
173 .build(); 171 .build();
174 172
175 - nextAppToActivate = storageService.<Application>atomicValueBuilder() 173 + appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
176 - .withName("onos-apps-activation-order-value") 174 + Serializer.using(KryoNamespaces.API));
177 - .withSerializer(Serializer.using(KryoNamespaces.API))
178 - .build()
179 - .asAtomicValue();
180 175
181 - nextAppToActivate.addListener(nextAppToActivateListener); 176 + appActivationTopic.subscribe(appActivator, messageHandlingExecutor);
182 177
183 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log)); 178 executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
184 statusChangeListener = status -> { 179 statusChangeListener = status -> {
...@@ -260,7 +255,7 @@ public class DistributedApplicationStore extends ApplicationArchive ...@@ -260,7 +255,7 @@ public class DistributedApplicationStore extends ApplicationArchive
260 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST); 255 clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
261 apps.removeStatusChangeListener(statusChangeListener); 256 apps.removeStatusChangeListener(statusChangeListener);
262 apps.removeListener(appsListener); 257 apps.removeListener(appsListener);
263 - nextAppToActivate.removeListener(nextAppToActivateListener); 258 + appActivationTopic.unsubscribe(appActivator);
264 messageHandlingExecutor.shutdown(); 259 messageHandlingExecutor.shutdown();
265 executor.shutdown(); 260 executor.shutdown();
266 log.info("Stopped"); 261 log.info("Stopped");
...@@ -362,7 +357,7 @@ public class DistributedApplicationStore extends ApplicationArchive ...@@ -362,7 +357,7 @@ public class DistributedApplicationStore extends ApplicationArchive
362 apps.computeIf(appId, v -> v != null && v.state() != ACTIVATED, 357 apps.computeIf(appId, v -> v != null && v.state() != ACTIVATED,
363 (k, v) -> new InternalApplicationHolder( 358 (k, v) -> new InternalApplicationHolder(
364 v.app(), ACTIVATED, v.permissions())); 359 v.app(), ACTIVATED, v.permissions()));
365 - nextAppToActivate.set(vAppHolder.value().app()); 360 + appActivationTopic.publish(vAppHolder.value().app());
366 } 361 }
367 } 362 }
368 363
...@@ -435,17 +430,13 @@ public class DistributedApplicationStore extends ApplicationArchive ...@@ -435,17 +430,13 @@ public class DistributedApplicationStore extends ApplicationArchive
435 } 430 }
436 } 431 }
437 432
438 - private class NextAppToActivateValueListener implements AtomicValueEventListener<Application> { 433 + private class AppActivator implements Consumer<Application> {
439 -
440 @Override 434 @Override
441 - public void event(AtomicValueEvent<Application> event) { 435 + public void accept(Application app) {
442 - messageHandlingExecutor.execute(() -> {
443 - Application app = event.newValue();
444 String appName = app.id().name(); 436 String appName = app.id().name();
445 installAppIfNeeded(app); 437 installAppIfNeeded(app);
446 setActive(appName); 438 setActive(appName);
447 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app)); 439 delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
448 - });
449 } 440 }
450 } 441 }
451 442
......