Madan Jampani
Committed by Gerrit Code Review

Distributed topic primitive

Change-Id: Ia3ccd84c33075f297d7e6b9bc205efe92aec9bea
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
15 */ 15 */
16 package org.onosproject.vtnrsc.util; 16 package org.onosproject.vtnrsc.util;
17 17
18 +import org.onosproject.store.service.Topic;
18 import org.onosproject.store.service.WorkQueue; 19 import org.onosproject.store.service.WorkQueue;
19 import org.onosproject.store.service.EventuallyConsistentMapBuilder; 20 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
20 import org.onosproject.store.service.ConsistentMapBuilder; 21 import org.onosproject.store.service.ConsistentMapBuilder;
...@@ -69,4 +70,9 @@ public class VtnStorageServiceAdapter implements StorageService { ...@@ -69,4 +70,9 @@ public class VtnStorageServiceAdapter implements StorageService {
69 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) { 70 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
70 return null; 71 return null;
71 } 72 }
73 +
74 + @Override
75 + public <T> Topic<T> getTopic(String name, Serializer serializer) {
76 + return null;
77 + }
72 } 78 }
......
...@@ -67,6 +67,11 @@ public interface DistributedPrimitive { ...@@ -67,6 +67,11 @@ public interface DistributedPrimitive {
67 WORK_QUEUE, 67 WORK_QUEUE,
68 68
69 /** 69 /**
70 + * Distributed topic.
71 + */
72 + TOPIC,
73 +
74 + /**
70 * Leader elector. 75 * Leader elector.
71 */ 76 */
72 LEADER_ELECTOR, 77 LEADER_ELECTOR,
......
...@@ -110,4 +110,15 @@ public interface StorageService { ...@@ -110,4 +110,15 @@ public interface StorageService {
110 * @return WorkQueue instance 110 * @return WorkQueue instance
111 */ 111 */
112 <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer); 112 <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
113 +
114 + /**
115 + * Returns an instance of {@code Topic} with specified name.
116 + *
117 + * @param <E> topic message type
118 + * @param name topic name
119 + * @param serializer serializer
120 + *
121 + * @return Topic instance
122 + */
123 + <T> Topic<T> getTopic(String name, Serializer serializer);
113 } 124 }
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.service;
17 +
18 +import java.util.concurrent.CompletableFuture;
19 +import java.util.function.Consumer;
20 +
21 +/**
22 + * A distributed publish subscribe primitive.
23 + * <p>
24 + * This primitive provides ordered message delivery guarantee i.e. all messages will be delivered to
25 + * all <i>active</i> subscribers and messages published from each publisher will be delivered
26 + * to all active subscribers in the order in which they are published.
27 + * <p>
28 + * Transient disruptions in communication such as occasional message drops are automatically handled
29 + * and recovered from without loss of delivery guarantees.
30 + * <p>
31 + * However, subscribers need to remain active or alive for these guarantees to apply. A subscriber that is
32 + * partitioned away for an extended duration (typically 5 seconds or more) will be marked as inactive and
33 + * during that period of inactivity will be removed from the list of current subscribers.
34 + *
35 + * @param <T> The type of message to be distributed to subscribers
36 + */
37 +public interface Topic<T> extends DistributedPrimitive {
38 +
39 + /**
40 + * Publishes a message to all subscribers.
41 + * <p>
42 + * The message is delivered in a asynchronous fashion which means subscribers will receive the
43 + * message eventually but not necessarily before the future returned by this method is completed.
44 + * @param message The non-null message to send to all current subscribers
45 + * @return a future that is completed when the message is logged (not necessarily delivered).
46 + */
47 + CompletableFuture<Void> publish(T message);
48 +
49 + /**
50 + * Subscribes to messages published to this topic.
51 + * @param callback callback that will invoked when a message published to the topic is received.
52 + * @return a future that is completed when subscription request is completed.
53 + */
54 + CompletableFuture<Void> subscribe(Consumer<T> callback);
55 +
56 + /**
57 + * Unsubscribes from this topic.
58 + * @param callback previously subscribed callback
59 + * @return a future that is completed when unsubscription request is completed.
60 + */
61 + CompletableFuture<Void> unsubscribe(Consumer<T> callback);
62 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -58,4 +58,9 @@ public class StorageServiceAdapter implements StorageService { ...@@ -58,4 +58,9 @@ public class StorageServiceAdapter implements StorageService {
58 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) { 58 public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
59 return null; 59 return null;
60 } 60 }
61 +
62 + @Override
63 + public <T> Topic<T> getTopic(String name, Serializer serializer) {
64 + return null;
65 + }
61 } 66 }
......
1 +/*
2 + * Copyright 2016-present Open Networking Laboratory
3 + *
4 + * Licensed under the Apache License, Version 2.0 (the "License");
5 + * you may not use this file except in compliance with the License.
6 + * You may obtain a copy of the License at
7 + *
8 + * http://www.apache.org/licenses/LICENSE-2.0
9 + *
10 + * Unless required by applicable law or agreed to in writing, software
11 + * distributed under the License is distributed on an "AS IS" BASIS,
12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 + * See the License for the specific language governing permissions and
14 + * limitations under the License.
15 + */
16 +package org.onosproject.store.primitives.impl;
17 +
18 +import java.util.Map;
19 +import java.util.concurrent.CompletableFuture;
20 +import java.util.function.Consumer;
21 +
22 +import org.onosproject.store.service.AsyncAtomicValue;
23 +import org.onosproject.store.service.AtomicValueEventListener;
24 +import org.onosproject.store.service.DistributedPrimitive;
25 +import org.onosproject.store.service.Topic;
26 +
27 +import com.google.common.collect.Maps;
28 +
29 +/**
30 + * Default implementation of {@link Topic}.
31 + *
32 + * @param <T> topic message type.
33 + */
34 +public class DefaultDistributedTopic<T> implements Topic<T> {
35 +
36 + private final AsyncAtomicValue<T> atomicValue;
37 + private final Map<Consumer<T>, AtomicValueEventListener<T>> callbacks = Maps.newIdentityHashMap();
38 +
39 + DefaultDistributedTopic(AsyncAtomicValue<T> atomicValue) {
40 + this.atomicValue = atomicValue;
41 + }
42 +
43 + @Override
44 + public String name() {
45 + return atomicValue.name();
46 + }
47 +
48 + @Override
49 + public Type primitiveType() {
50 + return DistributedPrimitive.Type.TOPIC;
51 + }
52 +
53 + @Override
54 + public CompletableFuture<Void> destroy() {
55 + return atomicValue.destroy();
56 + }
57 +
58 + @Override
59 + public CompletableFuture<Void> publish(T message) {
60 + return atomicValue.set(message);
61 + }
62 +
63 + @Override
64 + public CompletableFuture<Void> subscribe(Consumer<T> callback) {
65 + AtomicValueEventListener<T> valueListener = event -> callback.accept(event.newValue());
66 + if (callbacks.putIfAbsent(callback, valueListener) == null) {
67 + return atomicValue.addListener(valueListener);
68 + }
69 + return CompletableFuture.completedFuture(null);
70 + }
71 +
72 + @Override
73 + public CompletableFuture<Void> unsubscribe(Consumer<T> callback) {
74 + AtomicValueEventListener<T> valueListener = callbacks.remove(callback);
75 + if (valueListener != null) {
76 + return atomicValue.removeListener(valueListener);
77 + }
78 + return CompletableFuture.completedFuture(null);
79 + }
80 +}
...@@ -41,6 +41,7 @@ import org.onosproject.store.primitives.PartitionAdminService; ...@@ -41,6 +41,7 @@ import org.onosproject.store.primitives.PartitionAdminService;
41 import org.onosproject.store.primitives.PartitionService; 41 import org.onosproject.store.primitives.PartitionService;
42 import org.onosproject.store.primitives.TransactionId; 42 import org.onosproject.store.primitives.TransactionId;
43 import org.onosproject.store.serializers.KryoNamespaces; 43 import org.onosproject.store.serializers.KryoNamespaces;
44 +import org.onosproject.store.service.AsyncAtomicValue;
44 import org.onosproject.store.service.AsyncConsistentMap; 45 import org.onosproject.store.service.AsyncConsistentMap;
45 import org.onosproject.store.service.AtomicCounterBuilder; 46 import org.onosproject.store.service.AtomicCounterBuilder;
46 import org.onosproject.store.service.AtomicValueBuilder; 47 import org.onosproject.store.service.AtomicValueBuilder;
...@@ -54,6 +55,7 @@ import org.onosproject.store.service.PartitionInfo; ...@@ -54,6 +55,7 @@ import org.onosproject.store.service.PartitionInfo;
54 import org.onosproject.store.service.Serializer; 55 import org.onosproject.store.service.Serializer;
55 import org.onosproject.store.service.StorageAdminService; 56 import org.onosproject.store.service.StorageAdminService;
56 import org.onosproject.store.service.StorageService; 57 import org.onosproject.store.service.StorageService;
58 +import org.onosproject.store.service.Topic;
57 import org.onosproject.store.service.TransactionContextBuilder; 59 import org.onosproject.store.service.TransactionContextBuilder;
58 import org.onosproject.store.service.WorkQueue; 60 import org.onosproject.store.service.WorkQueue;
59 import org.onosproject.store.service.WorkQueueStats; 61 import org.onosproject.store.service.WorkQueueStats;
...@@ -217,4 +219,13 @@ public class StorageManager implements StorageService, StorageAdminService { ...@@ -217,4 +219,13 @@ public class StorageManager implements StorageService, StorageAdminService {
217 return new MapInfo(name, map.size()); 219 return new MapInfo(name, map.size());
218 }).collect(Collectors.toList()); 220 }).collect(Collectors.toList());
219 } 221 }
222 +
223 + @Override
224 + public <T> Topic<T> getTopic(String name, Serializer serializer) {
225 + AsyncAtomicValue<T> atomicValue = this.<T>atomicValueBuilder()
226 + .withName("topic-" + name)
227 + .withSerializer(serializer)
228 + .build();
229 + return new DefaultDistributedTopic<>(atomicValue);
230 + }
220 } 231 }
......