Madan Jampani
Committed by Brian O'Connor

Support for a distributed queue primitive.

Change-Id: I13abb93ec1703105ff0137e137738483a5b6a143
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
/**
* A distributed collection designed for holding elements prior to processing.
* A queue provides insertion, extraction and inspection operations. The extraction operation
* is designed to be non-blocking.
*
* @param <E> queue entry type
*/
public interface DistributedQueue<E> {
/**
* Returns total number of entries in the queue.
* @return queue size
*/
long size();
/**
* Returns true if queue has elements in it.
* @return true is queue has elements, false otherwise
*/
default boolean isEmpty() {
return size() == 0;
}
/**
* Inserts an entry into the queue.
* @param entry entry to insert
*/
void push(E entry);
/**
* If the queue is non-empty, an entry will be removed from the queue and the returned future
* will be immediately completed with it. If queue is empty when this call is made, the returned
* future will be eventually completed when an entry is added to the queue.
* @return queue entry
*/
CompletableFuture<E> pop();
/**
* Returns an entry from the queue without removing it. If the queue is empty returns null.
* @return queue entry or null if queue is empty
*/
E peek();
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* Builder for distributed queue.
*
* @param <E> type queue elements.
*/
public interface DistributedQueueBuilder<E> {
/**
* Sets the name of the queue.
* <p>
* Each queue is identified by a unique name.
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the queue
* @return this DistributedQueueBuilder for method chaining
*/
DistributedQueueBuilder<E> withName(String name);
/**
* Sets a serializer that can be used to serialize
* the elements pushed into the queue. The serializer
* builder should be pre-populated with any classes that will be
* put into the queue.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializer serializer
* @return this DistributedQueueBuilder for method chaining
*/
DistributedQueueBuilder<E> withSerializer(Serializer serializer);
/**
* Disables persistence of queues entries.
* <p>
* When persistence is disabled, a full cluster restart will wipe out all
* queue entries.
* </p>
* @return this DistributedQueueBuilder for method chaining
*/
DistributedQueueBuilder<E> withPersistenceDisabled();
/**
* Builds a queue based on the configuration options
* supplied to this builder.
*
* @return new distributed queue
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
DistributedQueue<E> build();
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.Set;
......
......@@ -56,6 +56,14 @@ public interface StorageService {
<E> DistributedSetBuilder<E> setBuilder();
/**
* Creates a new distributed queue builder.
*
* @param <E> queue entry type
* @return builder for an distributed queue
*/
<E> DistributedQueueBuilder<E> queueBuilder();
/**
* Creates a new AtomicCounterBuilder.
*
* @return atomic counter builder
......
......@@ -16,6 +16,7 @@
package org.onosproject.store.consistent.impl;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
......@@ -46,6 +47,7 @@ import org.apache.felix.scr.annotations.Service;
import static org.onlab.util.Tools.groupedThreads;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
......@@ -55,6 +57,7 @@ import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapInfo;
......@@ -98,16 +101,21 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated");
private ClusterCoordinator coordinator;
protected PartitionedDatabase partitionedDatabase;
protected Database inMemoryDatabase;
protected NodeId localNodeId;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
private ExecutorService eventDispatcher;
private ExecutorService queuePollExecutor;
private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -121,6 +129,7 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Activate
public void activate() {
localNodeId = clusterService.getLocalNode().id();
// load database configuration
File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
......@@ -201,6 +210,19 @@ public class DatabaseManager implements StorageService, StorageAdminService {
eventDispatcher = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/manager", "map-event-dispatcher"));
queuePollExecutor = Executors.newFixedThreadPool(4,
groupedThreads("onos/store/manager", "queue-poll-handler"));
clusterCommunicator.<String>addSubscriber(QUEUE_UPDATED_TOPIC,
data -> new String(data, Charsets.UTF_8),
name -> {
DefaultDistributedQueue q = queues.get(name);
if (q != null) {
q.tryPoll();
}
},
queuePollExecutor);
log.info("Started");
}
......@@ -226,8 +248,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
log.info("Successfully closed databases.");
}
});
maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
maps.values().forEach(this::unregisterMap);
eventDispatcher.shutdown();
queuePollExecutor.shutdown();
log.info("Stopped");
}
......@@ -318,6 +342,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
return new DefaultDistributedSetBuilder<>(this);
}
@Override
public <E> DistributedQueueBuilder<E> queueBuilder() {
return new DefaultDistributedQueueBuilder<>(this);
}
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
......@@ -386,8 +416,8 @@ public class DatabaseManager implements StorageService, StorageAdminService {
}
protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
// TODO: Support different local instances of the same map.
if (!maps.add(map)) {
// TODO: Support multiple local instances of the same map.
if (maps.putIfAbsent(map.name(), map) != null) {
throw new IllegalStateException("Map by name " + map.name() + " already exists");
}
......@@ -397,6 +427,19 @@ public class DatabaseManager implements StorageService, StorageAdminService {
eventDispatcher);
}
protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
if (maps.remove(map.name()) != null) {
clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
}
}
protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
// TODO: Support multiple local instances of the same queue.
if (queues.putIfAbsent(queue.name(), queue) != null) {
throw new IllegalStateException("Queue by name " + queue.name() + " already exists");
}
}
protected static MessageSubject mapUpdatesSubject(String mapName) {
return new MessageSubject(mapName + "-map-updates");
}
......
......@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -249,6 +250,36 @@ public interface DatabaseProxy<K, V> {
CompletableFuture<Long> counterGet(String counterName);
/**
* Returns the size of queue.
* @param queueName queue name
* @return queue size
*/
CompletableFuture<Long> queueSize(String queueName);
/**
* Inserts an entry into the queue.
* @param queueName queue name
* @param entry queue entry
* @return set of nodes to notify about the queue update
*/
CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry);
/**
* Removes an entry from the queue if the queue is non-empty.
* @param queueName queue name
* @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available
* @return entry. Can be null if queue is empty
*/
CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId);
/**
* Returns but does not remove an entry from the queue.
* @param queueName queue name
* @return entry. Can be null if queue is empty
*/
CompletableFuture<byte[]> queuePeek(String queueName);
/**
* Prepare and commit the specified transaction.
*
* @param transaction transaction to commit (after preparation)
......
......@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.DatabaseUpdate;
......@@ -78,6 +79,7 @@ public class DatabaseSerializer extends SerializerConfig {
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
.register(NodeId.class)
.build();
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
......
......@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -113,6 +114,18 @@ public interface DatabaseState<K, V> {
Long counterGetAndAdd(String counterName, long delta);
@Query
Long queueSize(String queueName);
@Query
byte[] queuePeek(String queueName);
@Command
byte[] queuePop(String queueName, NodeId requestor);
@Command
Set<NodeId> queuePush(String queueName, byte[] entry);
@Query
Long counterGet(String counterName);
@Command
......
......@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -187,6 +188,26 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
}
@Override
public CompletableFuture<Long> queueSize(String queueName) {
return checkOpen(() -> proxy.queueSize(queueName));
}
@Override
public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
return checkOpen(() -> proxy.queuePush(queueName, entry));
}
@Override
public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
return checkOpen(() -> proxy.queuePop(queueName, nodeId));
}
@Override
public CompletableFuture<byte[]> queuePeek(String queueName) {
return checkOpen(() -> proxy.queuePeek(queueName));
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
return checkOpen(() -> proxy.prepareAndCommit(transaction));
}
......
......@@ -19,13 +19,16 @@ package org.onosproject.store.consistent.impl;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -46,6 +49,8 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
private Long nextVersion;
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> tables;
private Map<String, Queue<byte[]>> queues;
private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
/**
* This locks map has a structure similar to the "tables" map above and
......@@ -77,6 +82,16 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
locks = Maps.newConcurrentMap();
context.put("locks", locks);
}
queues = context.get("queues");
if (queues == null) {
queues = Maps.newConcurrentMap();
context.put("queues", queues);
}
queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
if (queueUpdateNotificationTargets == null) {
queueUpdateNotificationTargets = Maps.newConcurrentMap();
context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
}
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
......@@ -288,6 +303,36 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
}
@Override
public Long queueSize(String queueName) {
return Long.valueOf(getQueue(queueName).size());
}
@Override
public byte[] queuePeek(String queueName) {
Queue<byte[]> queue = getQueue(queueName);
return queue.peek();
}
@Override
public byte[] queuePop(String queueName, NodeId requestor) {
Queue<byte[]> queue = getQueue(queueName);
if (queue.size() == 0 && requestor != null) {
getQueueUpdateNotificationTargets(queueName).add(requestor);
return null;
} else {
return queue.remove();
}
}
@Override
public Set<NodeId> queuePush(String queueName, byte[] entry) {
getQueue(queueName).add(entry);
Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
getQueueUpdateNotificationTargets(queueName).clear();
return notifyList;
}
@Override
public boolean prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
......@@ -335,6 +380,14 @@ public class DefaultDatabaseState implements DatabaseState<String, byte[]> {
return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
}
private Queue<byte[]> getQueue(String queueName) {
return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
}
private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
}
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
/**
* DistributedQueue implementation that provides FIFO ordering semantics.
*
* @param <E> queue entry type
*/
public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
private final String name;
private final Database database;
private final Serializer serializer;
private final NodeId localNodeId;
private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
private final Consumer<Set<NodeId>> notifyConsumers;
private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
public DefaultDistributedQueue(String name,
Database database,
Serializer serializer,
NodeId localNodeId,
Consumer<Set<NodeId>> notifyConsumers) {
this.name = checkNotNull(name, "queue name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.localNodeId = localNodeId;
this.notifyConsumers = notifyConsumers;
}
@Override
public long size() {
return Futures.getUnchecked(database.queueSize(name));
}
@Override
public void push(E entry) {
checkNotNull(entry, ERROR_NULL_ENTRY);
Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
.thenAccept(notifyConsumers)
.thenApply(v -> null));
}
@Override
public CompletableFuture<E> pop() {
return database.queuePop(name, localNodeId)
.thenCompose(v -> {
if (v != null) {
return CompletableFuture.completedFuture(serializer.decode(v));
} else {
CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
pendingFutures.add(newPendingFuture);
return newPendingFuture;
}
});
}
@Override
public E peek() {
return Futures.getUnchecked(database.queuePeek(name)
.thenApply(v -> v != null ? serializer.decode(v) : null));
}
public String name() {
return name;
}
protected void tryPoll() {
Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
for (CompletableFuture<E> future : pendingFutures) {
E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
.thenApply(v -> v != null ? serializer.decode(v) : null));
if (entry != null) {
future.complete(entry);
completedFutures.add(future);
} else {
break;
}
}
pendingFutures.removeAll(completedFutures);
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import java.util.Set;
import java.util.function.Consumer;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.Serializer;
import com.google.common.base.Charsets;
/**
* Default implementation of a {@code DistributedQueueBuilder}.
*
* @param <E> queue entry type
*/
public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
private Serializer serializer;
private String name;
private boolean persistenceEnabled = true;
private final DatabaseManager databaseManager;
public DefaultDistributedQueueBuilder(
DatabaseManager databaseManager) {
this.databaseManager = databaseManager;
}
@Override
public DistributedQueueBuilder<E> withName(String name) {
checkArgument(name != null && !name.isEmpty());
this.name = name;
return this;
}
@Override
public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
return this;
}
@Override
public DistributedQueueBuilder<E> withPersistenceDisabled() {
persistenceEnabled = false;
return this;
}
private boolean validInputs() {
return name != null && serializer != null;
}
@Override
public DistributedQueue<E> build() {
checkState(validInputs());
Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
DatabaseManager.QUEUE_UPDATED_TOPIC,
s -> s.getBytes(Charsets.UTF_8),
nodes);
DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
name,
persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
serializer,
databaseManager.localNodeId,
notifyOthers);
databaseManager.registerQueue(queue);
return queue;
}
}
......@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
......@@ -276,6 +277,31 @@ public class PartitionedDatabase implements Database {
return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
}
@Override
public CompletableFuture<Long> queueSize(String queueName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queueSize(queueName);
}
@Override
public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
}
@Override
public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
}
@Override
public CompletableFuture<byte[]> queuePeek(String queueName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
}
@Override
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);
......