Madan Jampani
Committed by Gerrit Code Review

New API for specifying an executor when registering a map listener

Change-Id: I1fc92e0a3da576d88d5ece4a666af8ad1c1fb9d8
......@@ -22,6 +22,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
......@@ -178,8 +179,8 @@ public class DefaultConsistentMap<K, V> extends Synchronous<AsyncConsistentMap<K
}
@Override
public void addListener(MapEventListener<K, V> listener) {
complete(asyncMap.addListener(listener));
public void addListener(MapEventListener<K, V> listener, Executor executor) {
complete(asyncMap.addListener(listener, executor));
}
@Override
......
......@@ -17,6 +17,7 @@
package org.onosproject.store.primitives;
import com.google.common.base.Throwables;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
......@@ -30,6 +31,7 @@ import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
......@@ -258,8 +260,8 @@ public class DefaultConsistentTreeMap<K, V> extends Synchronous<AsyncConsistentT
}
@Override
public void addListener(MapEventListener<K, V> listener) {
complete(treeMap.addListener(listener));
public void addListener(MapEventListener<K, V> listener, Executor executor) {
complete(treeMap.addListener(listener, executor));
}
@Override
......
......@@ -21,6 +21,7 @@ import java.util.Objects;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
......@@ -28,6 +29,8 @@ import java.util.function.Predicate;
import org.onosproject.store.primitives.DefaultConsistentMap;
import org.onosproject.store.primitives.TransactionId;
import com.google.common.util.concurrent.MoreExecutors;
/**
* A distributed, strongly consistent map whose methods are all executed asynchronously.
* <p>
......@@ -308,7 +311,18 @@ public interface AsyncConsistentMap<K, V> extends DistributedPrimitive {
* @param listener listener to notify about map events
* @return future that will be completed when the operation finishes
*/
CompletableFuture<Void> addListener(MapEventListener<K, V> listener);
default CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
return addListener(listener, MoreExecutors.directExecutor());
}
/**
* Registers the specified listener to be notified whenever the map is updated.
*
* @param listener listener to notify about map events
* @param executor executor to use for handling incoming map events
* @return future that will be completed when the operation finishes
*/
CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor);
/**
* Unregisters the specified listener such that it will no longer
......
......@@ -20,10 +20,13 @@ import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import com.google.common.util.concurrent.MoreExecutors;
/**
* {@code ConsistentMap} provides the same functionality as {@link AsyncConsistentMap} with
* the only difference that all its methods block until the corresponding operation completes.
......@@ -270,7 +273,17 @@ public interface ConsistentMap<K, V> extends DistributedPrimitive {
*
* @param listener listener to notify about map events
*/
void addListener(MapEventListener<K, V> listener);
default void addListener(MapEventListener<K, V> listener) {
addListener(listener, MoreExecutors.directExecutor());
}
/**
* Registers the specified listener to be notified whenever the map is updated.
*
* @param listener listener to notify about map events
* @param executor executor to use for handling incoming map events
*/
void addListener(MapEventListener<K, V> listener, Executor executor);
/**
* Unregisters the specified listener such that it will no longer
......
......@@ -18,6 +18,7 @@ package org.onosproject.store.service;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
......@@ -149,7 +150,7 @@ public class ConsistentMapAdapter<K, V> implements ConsistentMap<K, V> {
}
@Override
public void addListener(MapEventListener<K, V> listener) {
public void addListener(MapEventListener<K, V> listener, Executor executor) {
}
......
......@@ -23,6 +23,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
......@@ -33,6 +34,7 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
/**
......@@ -153,8 +155,8 @@ public class DelegatingAsyncConsistentMap<K, V> implements AsyncConsistentMap<K,
}
@Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
return delegateMap.addListener(listener);
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return delegateMap.addListener(listener, executor);
}
@Override
......
......@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
......@@ -39,6 +40,7 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
......@@ -190,9 +192,9 @@ public class PartitionedAsyncConsistentMap<K, V> implements AsyncConsistentMap<K
}
@Override
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener) {
public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
return CompletableFuture.allOf(getMaps().stream()
.map(map -> map.addListener(listener))
.map(map -> map.addListener(listener, executor))
.toArray(CompletableFuture[]::new));
}
......
......@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
......@@ -34,6 +35,7 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
/**
......@@ -235,11 +237,11 @@ public class TranscodingAsyncConsistentMap<K1, V1, K2, V2> implements AsyncConsi
}
@Override
public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener) {
public CompletableFuture<Void> addListener(MapEventListener<K1, V1> listener, Executor executor) {
synchronized (listeners) {
InternalBackingMapEventListener backingMapListener =
listeners.computeIfAbsent(listener, k -> new InternalBackingMapEventListener(listener));
return backingMap.addListener(backingMapListener);
return backingMap.addListener(backingMapListener, executor);
}
}
......
......@@ -22,10 +22,12 @@ import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
......@@ -54,7 +56,9 @@ import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
......@@ -65,7 +69,7 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
private final Map<MapEventListener<String, byte[]>, Executor> mapEventListeners = Maps.newIdentityHashMap();
public static final String CHANGE_SUBJECT = "changeEvents";
......@@ -87,7 +91,8 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
}
private void handleEvent(List<MapEvent<String, byte[]>> events) {
events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
events.forEach(event ->
mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event))));
}
@Override
......@@ -250,18 +255,19 @@ public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
}
@Override
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener) {
public synchronized CompletableFuture<Void> addListener(MapEventListener<String, byte[]> listener,
Executor executor) {
if (mapEventListeners.isEmpty()) {
return submit(new Listen()).thenRun(() -> mapEventListeners.add(listener));
return submit(new Listen()).thenRun(() -> mapEventListeners.putIfAbsent(listener, executor));
} else {
mapEventListeners.add(listener);
mapEventListeners.put(listener, executor);
return CompletableFuture.completedFuture(null);
}
}
@Override
public synchronized CompletableFuture<Void> removeListener(MapEventListener<String, byte[]> listener) {
if (mapEventListeners.remove(listener) && mapEventListeners.isEmpty()) {
if (mapEventListeners.remove(listener) != null && mapEventListeners.isEmpty()) {
return submit(new Unlisten()).thenApply(v -> null);
}
return CompletableFuture.completedFuture(null);
......