Madan Jampani
Committed by Gerrit Code Review

Distributed work queue primitive

Change-Id: Ia8e531e6611ec502399edec376ccc00522e47994
Showing 23 changed files with 1204 additions and 15 deletions
......@@ -15,6 +15,7 @@
*/
package org.onosproject.vtnrsc.util;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
......@@ -22,6 +23,7 @@ import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.StorageService;
......@@ -68,4 +70,9 @@ public class VtnStorageServiceAdapter implements StorageService {
public LeaderElectorBuilder leaderElectorBuilder() {
return null;
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
return null;
}
}
......
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.cli.net;
import java.util.Map;
import org.apache.karaf.shell.commands.Command;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.WorkQueueStats;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Command to list stats for all work queues in the system.
*/
@Command(scope = "onos", name = "queues",
description = "Lists information about work queues in the system")
public class QueuesListCommand extends AbstractShellCommand {
private static final String FMT = "name=%s pending=%d inProgress=%d, completed=%d";
@Override
protected void execute() {
StorageAdminService storageAdminService = get(StorageAdminService.class);
Map<String, WorkQueueStats> queueStats = storageAdminService.getQueueStats();
if (outputJson()) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode jsonQueues = mapper.createObjectNode();
queueStats.forEach((k, v) -> {
ObjectNode jsonStats = jsonQueues.putObject(k);
jsonStats.put("pending", v.totalPending());
jsonStats.put("inProgress", v.totalInProgress());
jsonStats.put("completed", v.totalCompleted());
});
print("%s", jsonQueues);
} else {
queueStats.forEach((name, stats) ->
print(FMT, name, stats.totalPending(), stats.totalInProgress(), stats.totalCompleted()));
}
}
}
......@@ -400,6 +400,9 @@
<action class="org.onosproject.cli.net.CountersListCommand"/>
</command>
<command>
<action class="org.onosproject.cli.net.QueuesListCommand"/>
</command>
<command>
<action class="org.onosproject.cli.net.TransactionsCommand"/>
</command>
<command>
......
......@@ -23,6 +23,7 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
/**
......@@ -88,6 +89,15 @@ public interface DistributedPrimitiveCreator {
AsyncLeaderElector newAsyncLeaderElector(String name);
/**
* Creates a new {@code WorkQueue}.
*
* @param name work queue name
* @param serializer serializer
* @return work queue
*/
<E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
/**
* Returns the names of all created {@code AsyncConsistentMap} instances.
* @return set of {@code AsyncConsistentMap} names
*/
......@@ -98,4 +108,10 @@ public interface DistributedPrimitiveCreator {
* @return set of {@code AsyncAtomicCounter} names
*/
Set<String> getAsyncAtomicCounterNames();
/**
* Returns the names of all created {@code WorkQueue} instances.
* @return set of {@code WorkQueue} names
*/
Set<String> getWorkQueueNames();
}
\ No newline at end of file
......
......@@ -52,6 +52,13 @@ public interface StorageAdminService {
Map<String, Long> getCounters();
/**
* Returns statistics for all the work queues in the system.
*
* @return mapping from queue name to that queue's stats
*/
Map<String, WorkQueueStats> getQueueStats();
/**
* Returns all pending transactions.
*
* @return collection of pending transaction identifiers.
......
......@@ -107,4 +107,13 @@ public interface StorageService {
default AtomicCounter getAtomicCounter(String name) {
return getAsyncAtomicCounter(name).asAtomicCounter();
}
/**
* Returns an instance of {@code WorkQueue} with specified name.
* @param name work queue name
* @param serializer serializer
*
* @return WorkQueue instance
*/
<E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.function.Function;
import com.google.common.base.MoreObjects;
/**
* {@link WorkQueue} task.
*
* @param <E> task payload type.
*/
public class Task<E> {
private final E payload;
private final String taskId;
private Task() {
payload = null;
taskId = null;
}
/**
* Constructs a new task instance.
* @param taskId task identifier
* @param payload task payload
*/
public Task(String taskId, E payload) {
this.taskId = taskId;
this.payload = payload;
}
/**
* Returns the task identifier.
* @return task id
*/
public String taskId() {
return taskId;
}
/**
* Returns the task payload.
* @return task payload
*/
public E payload() {
return payload;
}
/**
* Maps task from one payload type to another.
* @param mapper type mapper.
* @return mapped task.
*/
public <F> Task<F> map(Function<E, F> mapper) {
return new Task<>(taskId, mapper.apply(payload));
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("taskId", taskId)
.add("payload", payload)
.toString();
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableList;
/**
* Distributed Work Queue primitive.
* <p>
* Work queue serves as a buffer allowing producers to {@link #add(Collection) add} tasks and consumers
* to {@link #take() take} tasks to process.
* <p>
* In the system each task is tracked via its unique task identifier which is returned when a task is taken.
* Work queue guarantees that a task can be taken by only one consumer at a time. Once it finishes processing a
* consumer must invoke the {@link #complete(Collection) complete} method to mark the task(s) as completed.
* Tasks thus completed are removed from the queue. If a consumer unexpectedly terminates before it can complete
* all its tasks are returned back to the queue so that other consumers can pick them up. Since there is a distinct
* possibility that tasks could be processed more than once (under failure conditions), care should be taken to ensure
* task processing logic is idempotent.
*
* @param <E> task payload type.
*/
public interface WorkQueue<E> {
/**
* Adds a collection of tasks to the work queue.
* @param items collection of task items
* @return future that is completed when the operation completes
*/
CompletableFuture<Void> addMultiple(Collection<E> items);
/**
* Picks up multiple tasks from the work queue to work on.
* <p>
* Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
* If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
* the task becomes visible again to other consumers to process.
* @param maxItems maximum number of items to take from the queue. The actual number of tasks returned
* can be at the max this number
* @return future for the tasks. The future can be completed with an empty collection if there are no
* unassigned tasks in the work queue
*/
CompletableFuture<Collection<Task<E>>> take(int maxItems);
/**
* Completes a collection of tasks.
* @param taskIds ids of tasks to complete
* @return future that is completed when the operation completes
*/
CompletableFuture<Void> complete(Collection<String> taskIds);
/**
* Registers a task processing callback to be automatically invoked when new tasks are
* added to the work queue.
* @param taskProcessor task processing callback
* @param parallelism max tasks that can be processed in parallel
* @param executor executor to use for processing the tasks
* @return future that is completed when the operation completes
*/
CompletableFuture<Void> registerTaskProcessor(Consumer<E> taskProcessor,
int parallelism,
Executor executor);
/**
* Stops automatically processing tasks from work queue. This call nullifies the effect of a
* previous {@link #registerTaskProcessor registerTaskProcessor} call.
* @return future that is completed when the operation completes
*/
CompletableFuture<Void> stopProcessing();
/**
* Returns work queue statistics.
* @return future that is completed with work queue stats when the operation completes
*/
CompletableFuture<WorkQueueStats> stats();
/**
* Completes a collection of tasks.
* @param taskIds var arg list of task ids
* @return future that is completed when the operation completes
*/
default CompletableFuture<Void> complete(String... taskIds) {
return complete(Arrays.asList(taskIds));
}
/**
* Adds a single task to the work queue.
* @param item task item
* @return future that is completed when the operation completes
*/
default CompletableFuture<Void> addOne(E item) {
return addMultiple(ImmutableList.of(item));
}
/**
* Picks up a single task from the work queue to work on.
* <p>
* Tasks that are taken remain invisible to other consumers as long as the consumer stays alive.
* If a consumer unexpectedly terminates before {@link #complete(String...) completing} the task,
* the task becomes visible again to other consumers to process.
* @return future for the task. The future can be completed with null, if there are no
* unassigned tasks in the work queue
*/
default CompletableFuture<Task<E>> take() {
return this.take(1).thenApply(tasks -> tasks.isEmpty() ? null : tasks.iterator().next());
}
}
\ No newline at end of file
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import com.google.common.base.MoreObjects;
/**
* Statistics for a {@link WorkQueue}.
*/
public final class WorkQueueStats {
private long totalPending;
private long totalInProgress;
private long totalCompleted;
/**
* Returns a {@code WorkQueueStats} builder.
* @return builder
*/
public static Builder builder() {
return new Builder();
}
private WorkQueueStats() {
}
public static class Builder {
WorkQueueStats workQueueStats = new WorkQueueStats();
public Builder withTotalPending(long value) {
workQueueStats.totalPending = value;
return this;
}
public Builder withTotalInProgress(long value) {
workQueueStats.totalInProgress = value;
return this;
}
public Builder withTotalCompleted(long value) {
workQueueStats.totalCompleted = value;
return this;
}
public WorkQueueStats build() {
return workQueueStats;
}
}
/**
* Returns the total pending tasks. These are the tasks that are added but not yet picked up.
* @return total pending tasks.
*/
public long totalPending() {
return this.totalPending;
}
/**
* Returns the total in progress tasks. These are the tasks that are currently being worked on.
* @return total in progress tasks.
*/
public long totalInProgress() {
return this.totalInProgress;
}
/**
* Returns the total completed tasks.
* @return total completed tasks.
*/
public long totalCompleted() {
return this.totalCompleted;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("totalPending", totalPending)
.add("totalInProgress", totalInProgress)
.add("totalCompleted", totalCompleted)
.toString();
}
}
......@@ -58,4 +58,9 @@ public class StorageServiceAdapter implements StorageService {
public LeaderElectorBuilder leaderElectorBuilder() {
return null;
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
return null;
}
}
......
......@@ -33,6 +33,8 @@ import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapComman
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapFactory;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorCommands;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElectorFactory;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueFactory;
import org.onosproject.store.primitives.resources.impl.CommitResult;
import org.onosproject.store.primitives.resources.impl.MapEntryUpdateResult;
import org.onosproject.store.primitives.resources.impl.PrepareResult;
......@@ -40,8 +42,11 @@ import org.onosproject.store.primitives.resources.impl.RollbackResult;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
......@@ -81,15 +86,20 @@ public final class CatalystSerializers {
serializer.register(MapTransaction.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
serializer.register(Task.class, factory);
serializer.register(WorkQueueStats.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.register(ImmutableList.of().getClass(), factory);
serializer.resolve(new LongCommands.TypeResolver());
serializer.resolve(new AtomixConsistentMapCommands.TypeResolver());
serializer.resolve(new AtomixLeaderElectorCommands.TypeResolver());
serializer.resolve(new AtomixWorkQueueCommands.TypeResolver());
serializer.resolve(new ResourceManagerTypeResolver());
serializer.registerClassLoader(AtomixConsistentMapFactory.class)
.registerClassLoader(AtomixLeaderElectorFactory.class);
.registerClassLoader(AtomixLeaderElectorFactory.class)
.registerClassLoader(AtomixWorkQueueFactory.class);
return serializer;
}
......
......@@ -54,20 +54,19 @@ public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFacto
}
@Override
public void write(T object, BufferOutput buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
public void write(T object, BufferOutput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
try {
byte[] payload = this.serializer.encode(object);
buffer.writeInt(payload.length);
buffer.write(payload);
} catch (Exception e) {
log.warn("Failed to serialize {}", object, e);
throw Throwables.propagate(e);
}
}
@Override
public T read(Class<T> type, BufferInput buffer,
io.atomix.catalyst.serializer.Serializer serializer) {
public T read(Class<T> type, BufferInput buffer, io.atomix.catalyst.serializer.Serializer serializer) {
int size = buffer.readInt();
try {
byte[] payload = new byte[size];
......@@ -75,8 +74,7 @@ public class DefaultCatalystTypeSerializerFactory implements TypeSerializerFacto
return this.serializer.decode(payload);
} catch (Exception e) {
log.warn("Failed to deserialize as type {}. Payload size: {}", type, size, e);
Throwables.propagate(e);
return null;
throw Throwables.propagate(e);
}
}
}
......
package org.onosproject.store.primitives.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.collect.Collections2;
/**
* Default implementation of {@link WorkQueue}.
*
* @param <E> task payload type.
*/
public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
private final WorkQueue<byte[]> backingQueue;
private final Serializer serializer;
public DefaultDistributedWorkQueue(WorkQueue<byte[]> backingQueue, Serializer serializer) {
this.backingQueue = backingQueue;
this.serializer = serializer;
}
@Override
public CompletableFuture<Void> addMultiple(Collection<E> items) {
return backingQueue.addMultiple(items.stream()
.map(serializer::encode)
.collect(Collectors.toCollection(ArrayList::new)));
}
@Override
public CompletableFuture<Collection<Task<E>>> take(int maxTasks) {
return backingQueue.take(maxTasks)
.thenApply(tasks -> Collections2.transform(tasks, task -> task.<E>map(serializer::decode)));
}
@Override
public CompletableFuture<Void> complete(Collection<String> ids) {
return backingQueue.complete(ids);
}
@Override
public CompletableFuture<WorkQueueStats> stats() {
return backingQueue.stats();
}
@Override
public CompletableFuture<Void> registerTaskProcessor(Consumer<E> callback,
int parallelism,
Executor executor) {
Consumer<byte[]> backingQueueCallback = payload -> callback.accept(serializer.decode(payload));
return backingQueue.registerTaskProcessor(backingQueueCallback, parallelism, executor);
}
@Override
public CompletableFuture<Void> stopProcessing() {
return backingQueue.stopProcessing();
}
}
......@@ -30,6 +30,7 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import com.google.common.base.Charsets;
......@@ -101,6 +102,11 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv
}
@Override
public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
return getCreator(name).newWorkQueue(name, serializer);
}
@Override
public Set<String> getAsyncConsistentMapNames() {
return members.values()
.stream()
......@@ -118,6 +124,15 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv
.orElse(ImmutableSet.of());
}
@Override
public Set<String> getWorkQueueNames() {
return members.values()
.stream()
.map(DistributedPrimitiveCreator::getWorkQueueNames)
.reduce(Sets::union)
.orElse(ImmutableSet.of());
}
/**
* Returns the {@code DistributedPrimitiveCreator} to use for hosting a primitive.
* @param name primitive name
......
......@@ -46,6 +46,7 @@ import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
......@@ -54,6 +55,7 @@ import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
......@@ -171,6 +173,12 @@ public class StorageManager implements StorageService, StorageAdminService {
}
@Override
public <E> WorkQueue<E> getWorkQueue(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
return federatedPrimitiveCreator.newWorkQueue(name, serializer);
}
@Override
public List<MapInfo> getMapInfo() {
return listMapInfo(federatedPrimitiveCreator);
}
......@@ -185,6 +193,18 @@ public class StorageManager implements StorageService, StorageAdminService {
}
@Override
public Map<String, WorkQueueStats> getQueueStats() {
Map<String, WorkQueueStats> workQueueStats = Maps.newConcurrentMap();
federatedPrimitiveCreator.getWorkQueueNames()
.forEach(name -> workQueueStats.put(name,
federatedPrimitiveCreator.newWorkQueue(name,
Serializer.using(KryoNamespaces.BASIC))
.stats()
.join()));
return workQueueStats;
}
@Override
public List<PartitionInfo> getPartitionInfo() {
return partitionAdminService.partitionInfo();
}
......
......@@ -41,6 +41,7 @@ import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicCounter;
import org.onosproject.store.service.AsyncAtomicValue;
......@@ -49,6 +50,7 @@ import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
......@@ -159,11 +161,16 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
@Override
public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
// TODO: Implement
throw new UnsupportedOperationException();
}
@Override
public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
return new DefaultDistributedWorkQueue<>(workQueue, serializer);
}
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
......@@ -187,6 +194,11 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
}
@Override
public Set<String> getWorkQueueNames() {
return client.keys(AtomixWorkQueue.class).join();
}
@Override
public boolean isOpen() {
return resourceClient.client().state() != State.CLOSED;
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
/**
* Distributed resource providing the {@link WorkQueue} primitive.
*/
@ResourceTypeInfo(id = -154, factory = AtomixWorkQueueFactory.class)
public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
implements WorkQueue<byte[]> {
private final Logger log = getLogger(getClass());
public static final String TASK_AVAILABLE = "task-available";
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final AtomicReference<TaskProcessor> taskProcessor = new AtomicReference<>();
private final Timer timer = new Timer("atomix-work-queue-completer");
private final AtomicBoolean isRegistered = new AtomicBoolean(false);
protected AtomixWorkQueue(CopycatClient client, Properties options) {
super(client, options);
}
@Override
public CompletableFuture<AtomixWorkQueue> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
if (state == CopycatClient.State.CONNECTED && isRegistered.get()) {
client.submit(new Register());
}
});
client.onEvent(TASK_AVAILABLE, this::resumeWork);
return result;
});
}
@Override
public CompletableFuture<Void> addMultiple(Collection<byte[]> items) {
if (items.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return client.submit(new Add(items));
}
@Override
public CompletableFuture<Collection<Task<byte[]>>> take(int maxTasks) {
if (maxTasks <= 0) {
return CompletableFuture.completedFuture(ImmutableList.of());
}
return client.submit(new Take(maxTasks));
}
@Override
public CompletableFuture<Void> complete(Collection<String> taskIds) {
if (taskIds.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return client.submit(new Complete(taskIds));
}
@Override
public CompletableFuture<Void> registerTaskProcessor(Consumer<byte[]> callback,
int parallelism,
Executor executor) {
Accumulator<String> completedTaskAccumulator =
new CompletedTaskAccumulator(timer, 50, 50); // TODO: make configurable
taskProcessor.set(new TaskProcessor(callback,
parallelism,
executor,
completedTaskAccumulator));
return register().thenCompose(v -> take(parallelism))
.thenAccept(taskProcessor.get()::accept);
}
@Override
public CompletableFuture<Void> stopProcessing() {
return unregister();
}
@Override
public CompletableFuture<WorkQueueStats> stats() {
return client.submit(new Stats());
}
private void resumeWork() {
TaskProcessor activeProcessor = taskProcessor.get();
if (activeProcessor == null) {
return;
}
this.take(activeProcessor.headRoom())
.whenCompleteAsync((tasks, e) -> activeProcessor.accept(tasks), executor);
}
private CompletableFuture<Void> register() {
return client.submit(new Register()).thenRun(() -> isRegistered.set(true));
}
private CompletableFuture<Void> unregister() {
return client.submit(new Unregister()).thenRun(() -> isRegistered.set(false));
}
// TaskId accumulator for paced triggering of task completion calls.
private class CompletedTaskAccumulator extends AbstractAccumulator<String> {
CompletedTaskAccumulator(Timer timer, int maxTasksToBatch, int maxBatchMillis) {
super(timer, maxTasksToBatch, maxBatchMillis, Integer.MAX_VALUE);
}
@Override
public void processItems(List<String> items) {
complete(items);
}
}
private class TaskProcessor implements Consumer<Collection<Task<byte[]>>> {
private final AtomicInteger headRoom;
private final Consumer<byte[]> backingConsumer;
private final Executor executor;
private final Accumulator<String> taskCompleter;
public TaskProcessor(Consumer<byte[]> backingConsumer,
int parallelism,
Executor executor,
Accumulator<String> taskCompleter) {
this.backingConsumer = backingConsumer;
this.headRoom = new AtomicInteger(parallelism);
this.executor = executor;
this.taskCompleter = taskCompleter;
}
public int headRoom() {
return headRoom.get();
}
@Override
public void accept(Collection<Task<byte[]>> tasks) {
if (tasks == null) {
return;
}
headRoom.addAndGet(-1 * tasks.size());
tasks.forEach(task ->
executor.execute(() -> {
try {
backingConsumer.accept(task.payload());
taskCompleter.add(task.taskId());
} catch (Exception e) {
log.debug("Task execution failed", e);
} finally {
headRoom.incrementAndGet();
resumeWork();
}
}));
}
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.base.MoreObjects;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.copycat.Command;
/**
* {@link AtomixWorkQueue} resource state machine operations.
*/
public final class AtomixWorkQueueCommands {
private AtomixWorkQueueCommands() {
}
/**
* Command to add a collection of tasks to the queue.
*/
@SuppressWarnings("serial")
public static class Add implements Command<Void>, CatalystSerializable {
private Collection<byte[]> items;
private Add() {
}
public Add(Collection<byte[]> items) {
this.items = items;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeInt(items.size());
items.forEach(task -> serializer.writeObject(task, buffer));
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
items = IntStream.range(0, buffer.readInt())
.mapToObj(i -> serializer.<byte[]>readObject(buffer))
.collect(Collectors.toCollection(ArrayList::new));
}
public Collection<byte[]> items() {
return items;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("items", items)
.toString();
}
}
/**
* Command to take a task from the queue.
*/
@SuppressWarnings("serial")
public static class Take implements Command<Collection<Task<byte[]>>>, CatalystSerializable {
private int maxTasks;
private Take() {
}
public Take(int maxTasks) {
this.maxTasks = maxTasks;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeInt(maxTasks);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
maxTasks = buffer.readInt();
}
public int maxTasks() {
return maxTasks;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("maxTasks", maxTasks)
.toString();
}
}
@SuppressWarnings("serial")
public static class Stats implements Command<WorkQueueStats>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
}
@SuppressWarnings("serial")
public static class Register implements Command<Void>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
}
@SuppressWarnings("serial")
public static class Unregister implements Command<Void>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
}
@SuppressWarnings("serial")
public static class Complete implements Command<Void>, CatalystSerializable {
private Collection<String> taskIds;
private Complete() {
}
public Complete(Collection<String> taskIds) {
this.taskIds = taskIds;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
serializer.writeObject(taskIds, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
taskIds = serializer.readObject(buffer);
}
public Collection<String> taskIds() {
return taskIds;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("taskIds", taskIds)
.toString();
}
}
/**
* Work queue command type resolver.
*/
public static class TypeResolver implements SerializableTypeResolver {
@Override
public void resolve(SerializerRegistry registry) {
registry.register(Register.class, -960);
registry.register(Unregister.class, -961);
registry.register(Take.class, -962);
registry.register(Add.class, -963);
registry.register(Complete.class, -964);
registry.register(Stats.class, -965);
}
}
}
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.ResourceFactory;
import io.atomix.resource.ResourceStateMachine;
import java.util.Properties;
/**
* {@link AtomixWorkQueue} resource factory.
*/
public class AtomixWorkQueueFactory implements ResourceFactory<AtomixWorkQueue> {
@Override
public SerializableTypeResolver createSerializableTypeResolver() {
return new AtomixWorkQueueCommands.TypeResolver();
}
@Override
public ResourceStateMachine createStateMachine(Properties config) {
return new AtomixWorkQueueState(config);
}
@Override
public AtomixWorkQueue createInstance(CopycatClient client, Properties properties) {
return new AtomixWorkQueue(client, properties);
}
}
\ No newline at end of file
......@@ -16,6 +16,7 @@
package org.onosproject.store.primitives.resources.impl;
import com.google.common.util.concurrent.Uninterruptibles;
import io.atomix.AtomixClient;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.transport.Address;
......@@ -116,16 +117,16 @@ public abstract class AtomixTestBase {
CompletableFuture.allOf(atomixClients.stream()
.map(AtomixClient::close)
.toArray(CompletableFuture[]::new));
closeClients.join();
closeClients
.thenCompose(v -> CompletableFuture
.allOf(copycatServers.stream()
CompletableFuture<Void> closeServers =
CompletableFuture.allOf(copycatServers.stream()
.map(CopycatServer::shutdown)
.toArray(CompletableFuture[]::new))).join();
atomixClients = new ArrayList<>();
.toArray(CompletableFuture[]::new));
closeServers.join();
copycatServers = new ArrayList<>();
atomixClients.clear();
copycatServers.clear();
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.resources.impl;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.resource.ResourceType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueueStats;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link AtomixWorkQueue}.
*/
public class AtomixWorkQueueTest extends AtomixTestBase {
private static final Duration DEFAULT_PROCESSING_TIME = Duration.ofMillis(100);
private static final byte[] DEFAULT_PAYLOAD = "hello world".getBytes();
@BeforeClass
public static void preTestSetup() throws Throwable {
createCopycatServers(1);
}
@AfterClass
public static void postTestCleanup() throws Exception {
clearTests();
}
@Override
protected ResourceType resourceType() {
return new ResourceType(AtomixWorkQueue.class);
}
@Test
public void testAdd() throws Throwable {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item = DEFAULT_PAYLOAD;
queue1.addOne(item).join();
Atomix atomix2 = createAtomixClient();
AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
byte[] task2 = DEFAULT_PAYLOAD;
queue2.addOne(task2).join();
WorkQueueStats stats = queue1.stats().join();
assertEquals(stats.totalPending(), 2);
assertEquals(stats.totalInProgress(), 0);
assertEquals(stats.totalCompleted(), 0);
}
@Test
public void testAddMultiple() throws Throwable {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item1 = DEFAULT_PAYLOAD;
byte[] item2 = DEFAULT_PAYLOAD;
queue1.addMultiple(Arrays.asList(item1, item2)).join();
WorkQueueStats stats = queue1.stats().join();
assertEquals(stats.totalPending(), 2);
assertEquals(stats.totalInProgress(), 0);
assertEquals(stats.totalCompleted(), 0);
}
@Test
public void testTakeAndComplete() throws Throwable {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item1 = DEFAULT_PAYLOAD;
queue1.addOne(item1).join();
Atomix atomix2 = createAtomixClient();
AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
Task<byte[]> removedTask = queue2.take().join();
WorkQueueStats stats = queue2.stats().join();
assertEquals(stats.totalPending(), 0);
assertEquals(stats.totalInProgress(), 1);
assertEquals(stats.totalCompleted(), 0);
assertTrue(Arrays.equals(removedTask.payload(), item1));
queue2.complete(Arrays.asList(removedTask.taskId())).join();
stats = queue1.stats().join();
assertEquals(stats.totalPending(), 0);
assertEquals(stats.totalInProgress(), 0);
assertEquals(stats.totalCompleted(), 1);
// Another take should return null
assertNull(queue2.take().join());
}
@Test
public void testUnexpectedClientClose() throws Throwable {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item1 = DEFAULT_PAYLOAD;
queue1.addOne(item1).join();
AtomixClient atomix2 = createAtomixClient();
AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
queue2.take().join();
WorkQueueStats stats = queue1.stats().join();
assertEquals(0, stats.totalPending());
assertEquals(1, stats.totalInProgress());
assertEquals(0, stats.totalCompleted());
atomix2.close().join();
stats = queue1.stats().join();
assertEquals(1, stats.totalPending());
assertEquals(0, stats.totalInProgress());
assertEquals(0, stats.totalCompleted());
}
@Test
public void testAutomaticTaskProcessing() throws Throwable {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
Executor executor = Executors.newSingleThreadExecutor();
CountDownLatch latch1 = new CountDownLatch(1);
queue1.registerTaskProcessor(s -> latch1.countDown(), 2, executor);
AtomixClient atomix2 = createAtomixClient();
AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item1 = DEFAULT_PAYLOAD;
queue2.addOne(item1).join();
Uninterruptibles.awaitUninterruptibly(latch1, 500, TimeUnit.MILLISECONDS);
queue1.stopProcessing();
byte[] item2 = DEFAULT_PAYLOAD;
byte[] item3 = DEFAULT_PAYLOAD;
Tools.delay((int) DEFAULT_PROCESSING_TIME.toMillis());
queue2.addMultiple(Arrays.asList(item2, item3)).join();
WorkQueueStats stats = queue1.stats().join();
assertEquals(2, stats.totalPending());
assertEquals(0, stats.totalInProgress());
assertEquals(1, stats.totalCompleted());
CountDownLatch latch2 = new CountDownLatch(2);
queue1.registerTaskProcessor(s -> latch2.countDown(), 2, executor);
Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
}
}
......@@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.onlab.packet.ChassisId;
import org.onlab.packet.EthType;
import org.onlab.packet.Ip4Address;
......@@ -208,7 +209,9 @@ import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
import java.net.URI;
import java.time.Duration;
......@@ -338,6 +341,8 @@ public final class KryoNamespaces {
Leadership.class,
LeadershipEvent.class,
LeadershipEvent.Type.class,
Task.class,
WorkQueueStats.class,
HostId.class,
HostDescription.class,
DefaultHostDescription.class,
......