Madan Jampani
Committed by Gerrit Code Review

ONOS-2440: Simplify DistributedQueue implementation by leveraging state change notification support

Change-Id: Id0a48f07535d8b7e1d0f964bd1c0623ca81d4605
......@@ -16,7 +16,6 @@
package org.onosproject.store.consistent.impl;
import com.google.common.base.Charsets;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
......@@ -49,8 +48,6 @@ import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
import org.apache.felix.scr.annotations.Service;
import static org.onlab.util.Tools.groupedThreads;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationListener;
import org.onosproject.app.ApplicationService;
......@@ -61,7 +58,6 @@ import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
......@@ -86,7 +82,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -112,8 +107,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated");
private ClusterCoordinator coordinator;
protected PartitionedDatabase partitionedDatabase;
protected Database inMemoryDatabase;
......@@ -122,15 +115,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
private ExecutorService eventDispatcher;
private ExecutorService queuePollExecutor;
private ApplicationListener appListener = new InternalApplicationListener();
private final Multimap<String, DefaultAsyncConsistentMap> maps =
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
private final Multimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication =
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -237,21 +227,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
partitionedDatabase.setTransactionManager(transactionManager);
eventDispatcher = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/manager", "map-event-dispatcher"));
queuePollExecutor = Executors.newFixedThreadPool(4,
groupedThreads("onos/store/manager", "queue-poll-handler"));
clusterCommunicator.<String>addSubscriber(QUEUE_UPDATED_TOPIC,
data -> new String(data, Charsets.UTF_8),
name -> {
DefaultDistributedQueue q = queues.get(name);
if (q != null) {
q.tryPoll();
}
},
queuePollExecutor);
log.info("Started");
}
......@@ -277,13 +252,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
log.info("Successfully closed databases.");
}
});
clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
maps.values().forEach(this::unregisterMap);
if (applicationService != null) {
applicationService.removeListener(appListener);
}
eventDispatcher.shutdown();
queuePollExecutor.shutdown();
log.info("Stopped");
}
......@@ -467,13 +439,6 @@ public class DatabaseManager implements StorageService, StorageAdminService {
}
}
protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
// TODO: Support multiple local instances of the same queue.
if (queues.putIfAbsent(queue.name(), queue) != null) {
throw new IllegalStateException("Queue by name " + queue.name() + " already exists");
}
}
private class InternalApplicationListener implements ApplicationListener {
@Override
public void event(ApplicationEvent event) {
......
......@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -168,17 +167,16 @@ public interface DatabaseProxy<K, V> {
* Inserts an entry into the queue.
* @param queueName queue name
* @param entry queue entry
* @return set of nodes to notify about the queue update
* @return void future
*/
CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry);
CompletableFuture<Void> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
* @param queueName queue name
* @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available
* @return entry. Can be null if queue is empty
* @return entry future. Can be completed with null if queue is empty
*/
CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId);
CompletableFuture<byte[]> queuePop(String queueName);
/**
* Returns but does not remove an entry from the queue.
......
......@@ -21,7 +21,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -93,10 +92,10 @@ public interface DatabaseState<K, V> {
byte[] queuePeek(String queueName);
@Command
byte[] queuePop(String queueName, NodeId requestor);
byte[] queuePop(String queueName);
@Command
Set<NodeId> queuePush(String queueName, byte[] entry);
void queuePush(String queueName, byte[] entry);
@Query
Long counterGet(String counterName);
......
......@@ -47,7 +47,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
import static org.slf4j.LoggerFactory.getLogger;
......@@ -122,7 +122,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V
this.purgeOnUninstall = purgeOnUninstall;
this.database.registerConsumer(update -> {
SharedExecutors.getSingleThreadExecutor().execute(() -> {
if (update.target() == MAP) {
if (update.target() == MAP_UPDATE) {
Result<UpdateResult<String, byte[]>> result = update.output();
if (result.success() && result.value().mapName().equals(name)) {
MapEvent<K, V> mapEvent = result.value().<K, V>map(this::dK, serializer::decode).toMapEvent();
......
......@@ -30,7 +30,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -159,13 +158,13 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
return checkOpen(() -> proxy.queuePush(queueName, entry));
}
@Override
public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
return checkOpen(() -> proxy.queuePop(queueName, nodeId));
public CompletableFuture<byte[]> queuePop(String queueName) {
return checkOpen(() -> proxy.queuePop(queueName));
}
@Override
......
......@@ -18,7 +18,6 @@ package org.onosproject.store.consistent.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
......@@ -27,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.Set;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -48,7 +46,6 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> maps;
private Map<String, Queue<byte[]>> queues;
private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
/**
* This locks map has a structure similar to the "tables" map above and
......@@ -85,11 +82,6 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
queues = Maps.newConcurrentMap();
context.put("queues", queues);
}
queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
if (queueUpdateNotificationTargets == null) {
queueUpdateNotificationTargets = Maps.newConcurrentMap();
context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
}
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
......@@ -214,27 +206,17 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
@Override
public byte[] queuePeek(String queueName) {
Queue<byte[]> queue = getQueue(queueName);
return queue.peek();
return getQueue(queueName).peek();
}
@Override
public byte[] queuePop(String queueName, NodeId requestor) {
Queue<byte[]> queue = getQueue(queueName);
if (queue.size() == 0 && requestor != null) {
getQueueUpdateNotificationTargets(queueName).add(requestor);
return null;
} else {
return queue.remove();
}
public byte[] queuePop(String queueName) {
return getQueue(queueName).poll();
}
@Override
public Set<NodeId> queuePush(String queueName, byte[] entry) {
getQueue(queueName).add(entry);
Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
getQueueUpdateNotificationTargets(queueName).clear();
return notifyList;
public void queuePush(String queueName, byte[] entry) {
getQueue(queueName).offer(entry);
}
@Override
......@@ -289,10 +271,6 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
}
private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
}
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = mapGet(update.mapName(), update.key());
switch (update.type()) {
......
......@@ -17,15 +17,16 @@ package org.onosproject.store.consistent.impl;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import org.onosproject.cluster.NodeId;
import org.onlab.util.SharedExecutors;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.QUEUE_PUSH;
/**
* DistributedQueue implementation that provides FIFO ordering semantics.
......@@ -37,9 +38,7 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
private final String name;
private final Database database;
private final Serializer serializer;
private final NodeId localNodeId;
private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
private final Consumer<Set<NodeId>> notifyConsumers;
private static final String PRIMITIVE_NAME = "distributedQueue";
private static final String SIZE = "size";
......@@ -53,66 +52,59 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
public DefaultDistributedQueue(String name,
Database database,
Serializer serializer,
NodeId localNodeId,
boolean meteringEnabled,
Consumer<Set<NodeId>> notifyConsumers) {
boolean meteringEnabled) {
this.name = checkNotNull(name, "queue name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.localNodeId = localNodeId;
this.notifyConsumers = notifyConsumers;
this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
this.database.registerConsumer(update -> {
SharedExecutors.getSingleThreadExecutor().execute(() -> {
if (update.target() == QUEUE_PUSH) {
List<Object> input = update.input();
String queueName = (String) input.get(0);
if (queueName.equals(name)) {
tryPoll();
}
}
});
});
}
@Override
public long size() {
final MeteringAgent.Context timer = monitor.startTimer(SIZE);
try {
return Futures.getUnchecked(database.queueSize(name));
} finally {
timer.stop();
}
return Futures.getUnchecked(database.queueSize(name).whenComplete((r, e) -> timer.stop()));
}
@Override
public void push(E entry) {
final MeteringAgent.Context timer = monitor.startTimer(PUSH);
try {
checkNotNull(entry, ERROR_NULL_ENTRY);
final MeteringAgent.Context timer = monitor.startTimer(PUSH);
Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
.thenAccept(notifyConsumers)
.thenApply(v -> null));
} finally {
timer.stop();
}
.whenComplete((r, e) -> timer.stop()));
}
@Override
public CompletableFuture<E> pop() {
final MeteringAgent.Context timer = monitor.startTimer(POP);
return database.queuePop(name, localNodeId)
return database.queuePop(name)
.whenComplete((r, e) -> timer.stop())
.thenCompose(v -> {
if (v != null) {
return CompletableFuture.completedFuture(serializer.decode(v));
} else {
}
CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
pendingFutures.add(newPendingFuture);
return newPendingFuture;
}
})
.whenComplete((r, e) -> timer.stop());
});
}
@Override
public E peek() {
final MeteringAgent.Context timer = monitor.startTimer(PEEK);
try {
return Futures.getUnchecked(database.queuePeek(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
} finally {
timer.stop();
}
.thenApply(v -> v != null ? serializer.<E>decode(v) : null)
.whenComplete((r, e) -> timer.stop()));
}
public String name() {
......@@ -122,7 +114,7 @@ public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
E entry = Futures.getUnchecked(database.queuePop(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
if (entry != null) {
future.complete(entry);
......
......@@ -15,15 +15,10 @@
*/
package org.onosproject.store.consistent.impl;
import com.google.common.base.Charsets;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.Serializer;
import java.util.Set;
import java.util.function.Consumer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
......@@ -40,8 +35,7 @@ public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilde
private final DatabaseManager databaseManager;
private boolean metering = true;
public DefaultDistributedQueueBuilder(
DatabaseManager databaseManager) {
public DefaultDistributedQueueBuilder(DatabaseManager databaseManager) {
this.databaseManager = databaseManager;
}
......@@ -78,18 +72,10 @@ public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilde
@Override
public DistributedQueue<E> build() {
checkState(validInputs());
Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
DatabaseManager.QUEUE_UPDATED_TOPIC,
s -> s.getBytes(Charsets.UTF_8),
nodes);
DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
return new DefaultDistributedQueue<>(
name,
persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
serializer,
databaseManager.localNodeId,
metering,
notifyOthers);
databaseManager.registerQueue(queue);
return queue;
metering);
}
}
......
......@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -229,15 +228,15 @@ public class PartitionedDatabase implements Database {
}
@Override
public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
public CompletableFuture<Void> queuePush(String queueName, byte[] entry) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
}
@Override
public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
public CompletableFuture<byte[]> queuePop(String queueName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
return partitioner.getPartition(queueName, queueName).queuePop(queueName);
}
@Override
......
......@@ -29,7 +29,7 @@ public class StateMachineUpdate {
/**
* Update is for a map.
*/
MAP,
MAP_UPDATE,
/**
* Update is a transaction commit.
......@@ -37,7 +37,12 @@ public class StateMachineUpdate {
TX_COMMIT,
/**
* Update is for a non-map data structure.
* Update is a queue push.
*/
QUEUE_PUSH,
/**
* Update is for some other operation.
*/
OTHER
}
......@@ -55,9 +60,11 @@ public class StateMachineUpdate {
public Target target() {
// FIXME: This check is brittle
if (operationName.contains("mapUpdate")) {
return Target.MAP;
return Target.MAP_UPDATE;
} else if (operationName.contains("commit") || operationName.contains("prepareAndCommit")) {
return Target.TX_COMMIT;
} else if (operationName.contains("queuePush")) {
return Target.QUEUE_PUSH;
} else {
return Target.OTHER;
}
......