Madan Jampani

Update TestTopic to confirm to the new Topic API

Change-Id: Ieade9340ac32d50f4e8304f3a347687a77ef49e3
...@@ -16,10 +16,11 @@ ...@@ -16,10 +16,11 @@
16 16
17 package org.onosproject.store.service; 17 package org.onosproject.store.service;
18 18
19 -import com.google.common.collect.Sets; 19 +import com.google.common.collect.Maps;
20 20
21 -import java.util.Set; 21 +import java.util.Map;
22 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.CompletableFuture;
23 +import java.util.concurrent.Executor;
23 import java.util.function.Consumer; 24 import java.util.function.Consumer;
24 25
25 /** 26 /**
...@@ -27,7 +28,7 @@ import java.util.function.Consumer; ...@@ -27,7 +28,7 @@ import java.util.function.Consumer;
27 */ 28 */
28 public class TestTopic<T> implements Topic<T> { 29 public class TestTopic<T> implements Topic<T> {
29 private final String name; 30 private final String name;
30 - private final Set<Consumer<T>> callbacks = Sets.newConcurrentHashSet(); 31 + private final Map<Consumer<T>, Executor> callbacks = Maps.newIdentityHashMap();
31 32
32 public TestTopic(String name) { 33 public TestTopic(String name) {
33 this.name = name; 34 this.name = name;
...@@ -35,13 +36,17 @@ public class TestTopic<T> implements Topic<T> { ...@@ -35,13 +36,17 @@ public class TestTopic<T> implements Topic<T> {
35 36
36 @Override 37 @Override
37 public CompletableFuture<Void> publish(T message) { 38 public CompletableFuture<Void> publish(T message) {
38 - callbacks.forEach(c -> c.accept(message)); 39 + callbacks.forEach((k, v) -> {
40 + v.execute(() -> {
41 + k.accept(message);
42 + });
43 + });
39 return CompletableFuture.completedFuture(null); 44 return CompletableFuture.completedFuture(null);
40 } 45 }
41 46
42 @Override 47 @Override
43 - public CompletableFuture<Void> subscribe(Consumer<T> callback) { 48 + public CompletableFuture<Void> subscribe(Consumer<T> callback, Executor executor) {
44 - callbacks.add(callback); 49 + callbacks.put(callback, executor);
45 return CompletableFuture.completedFuture(null); 50 return CompletableFuture.completedFuture(null);
46 } 51 }
47 52
......