Madan Jampani

Work queue improvements

- Fixed logic to ensure only session to which task is currently assigned can complete it
- Support destroy method to reset work queue state
- Removed deprecated DistributedQueue primitive

Change-Id: I4e1d5be4eb142115130acf15ff34035cb9319a1a
Showing 18 changed files with 141 additions and 281 deletions
......@@ -19,7 +19,6 @@ import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
......@@ -47,11 +46,6 @@ public class VtnStorageServiceAdapter implements StorageService {
}
@Override
public <E> DistributedQueueBuilder<E> queueBuilder() {
return null;
}
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
return null;
}
......
......@@ -22,7 +22,6 @@ import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
......@@ -61,16 +60,6 @@ public interface DistributedPrimitiveCreator {
<V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer);
/**
* Creates a new {@code DistributedQueue}.
*
* @param name queue name
* @param serializer serializer to use for serializing/deserializing queue entries
* @param <E> queue entry type
* @return queue
*/
<E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer);
/**
* Creates a new {@code AsyncDistributedSet}.
*
* @param name set name
......
......@@ -62,9 +62,9 @@ public interface DistributedPrimitive {
VALUE,
/**
* Distributed queue.
* Distributed work queue.
*/
QUEUE,
WORK_QUEUE,
/**
* Leader elector.
......
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.concurrent.CompletableFuture;
/**
* A distributed collection designed for holding elements prior to processing.
* A queue provides insertion, extraction and inspection operations. The extraction operation
* is designed to be non-blocking.
*
* @param <E> queue entry type
*/
public interface DistributedQueue<E> extends DistributedPrimitive {
/**
* Returns total number of entries in the queue.
* @return queue size
*/
long size();
/**
* Returns true if queue has elements in it.
* @return true is queue has elements, false otherwise
*/
default boolean isEmpty() {
return size() == 0;
}
/**
* Inserts an entry into the queue.
* @param entry entry to insert
*/
void push(E entry);
/**
* If the queue is non-empty, an entry will be removed from the queue and the returned future
* will be immediately completed with it. If queue is empty when this call is made, the returned
* future will be eventually completed when an entry is added to the queue.
* @return queue entry
*/
CompletableFuture<E> pop();
/**
* Returns an entry from the queue without removing it. If the queue is empty returns null.
* @return queue entry or null if queue is empty
*/
E peek();
}
\ No newline at end of file
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* Builder for distributed queue.
*
* @param <E> type queue elements.
*/
public interface DistributedQueueBuilder<E> {
/**
* Sets the name of the queue.
* <p>
* Each queue is identified by a unique name.
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the queue
* @return this DistributedQueueBuilder for method chaining
*/
DistributedQueueBuilder<E> withName(String name);
/**
* Sets a serializer that can be used to serialize
* the elements pushed into the queue. The serializer
* builder should be pre-populated with any classes that will be
* put into the queue.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializer serializer
* @return this DistributedQueueBuilder for method chaining
*/
DistributedQueueBuilder<E> withSerializer(Serializer serializer);
/**
* Builds a queue based on the configuration options
* supplied to this builder.
*
* @return new distributed queue
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
DistributedQueue<E> build();
}
......@@ -52,14 +52,6 @@ public interface StorageService {
<E> DistributedSetBuilder<E> setBuilder();
/**
* Creates a new DistributedQueueBuilder.
*
* @param <E> queue entry type
* @return builder for an distributed queue
*/
<E> DistributedQueueBuilder<E> queueBuilder();
/**
* Creates a new AtomicCounterBuilder.
*
* @return atomic counter builder
......
......@@ -39,7 +39,12 @@ import com.google.common.collect.ImmutableList;
*
* @param <E> task payload type.
*/
public interface WorkQueue<E> {
public interface WorkQueue<E> extends DistributedPrimitive {
@Override
default DistributedPrimitive.Type primitiveType() {
return DistributedPrimitive.Type.WORK_QUEUE;
}
/**
* Adds a collection of tasks to the work queue.
......
......@@ -35,11 +35,6 @@ public class StorageServiceAdapter implements StorageService {
}
@Override
public <E> DistributedQueueBuilder<E> queueBuilder() {
return null;
}
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
return null;
}
......
......@@ -34,11 +34,6 @@ public class TestStorageService extends StorageServiceAdapter {
}
@Override
public <E> DistributedQueueBuilder<E> queueBuilder() {
throw new UnsupportedOperationException("queueBuilder");
}
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
return TestAtomicCounter.builder();
}
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.Serializer;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
* Default implementation of a {@code DistributedQueueBuilder}.
*
* @param <E> queue entry type
*/
public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
private final DistributedPrimitiveCreator primitiveCreator;
private String name;
private Serializer serializer;
public DefaultDistributedQueueBuilder(DistributedPrimitiveCreator primitiveCreator) {
this.primitiveCreator = primitiveCreator;
}
@Override
public DistributedQueueBuilder<E> withName(String name) {
checkArgument(name != null && !name.isEmpty());
this.name = name;
return this;
}
@Override
public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
return this;
}
private boolean validInputs() {
return name != null && serializer != null;
}
@Override
public DistributedQueue<E> build() {
checkState(validInputs());
return primitiveCreator.newDistributedQueue(name, serializer);
}
}
......@@ -30,6 +30,11 @@ public class DefaultDistributedWorkQueue<E> implements WorkQueue<E> {
}
@Override
public String name() {
return backingQueue.name();
}
@Override
public CompletableFuture<Void> addMultiple(Collection<E> items) {
return backingQueue.addMultiple(items.stream()
.map(serializer::encode)
......
......@@ -29,9 +29,8 @@ import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
......@@ -84,11 +83,6 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv
}
@Override
public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
return getCreator(name).newDistributedQueue(name, serializer);
}
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
checkNotNull(name);
Map<PartitionId, AsyncLeaderElector> leaderElectors =
......
......@@ -15,6 +15,8 @@
*/
package org.onosproject.store.primitives.impl;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
......@@ -44,9 +46,7 @@ import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.LeaderElectorBuilder;
import org.onosproject.store.service.MapInfo;
......@@ -55,15 +55,13 @@ import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContextBuilder;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.*;
/**
* Implementation for {@code StorageService} and {@code StorageAdminService}.
*/
......@@ -137,12 +135,6 @@ public class StorageManager implements StorageService, StorageAdminService {
}
@Override
public <E> DistributedQueueBuilder<E> queueBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultDistributedQueueBuilder<>(federatedPrimitiveCreator);
}
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
checkPermission(STORAGE_WRITE);
return new DefaultAtomicCounterBuilder(federatedPrimitiveCreator);
......
......@@ -49,10 +49,9 @@ import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
import com.google.common.base.Supplier;
......@@ -160,11 +159,6 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
}
@Override
public <E> DistributedQueue<E> newDistributedQueue(String name, Serializer serializer) {
throw new UnsupportedOperationException();
}
@Override
public <E> WorkQueue<E> newWorkQueue(String name, Serializer serializer) {
AtomixWorkQueue workQueue = client.getResource(name, AtomixWorkQueue.class).join();
return new DefaultDistributedWorkQueue<>(workQueue, serializer);
......
......@@ -18,6 +18,9 @@ package org.onosproject.store.primitives.resources.impl;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import java.util.Collection;
import java.util.List;
......@@ -34,22 +37,19 @@ import java.util.function.Consumer;
import org.onlab.util.AbstractAccumulator;
import org.onlab.util.Accumulator;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Take;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Unregister;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
import io.atomix.copycat.client.CopycatClient;
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
/**
* Distributed resource providing the {@link WorkQueue} primitive.
*/
......@@ -69,6 +69,18 @@ public class AtomixWorkQueue extends AbstractResource<AtomixWorkQueue>
}
@Override
public String name() {
return null;
}
@Override
public CompletableFuture<Void> destroy() {
executor.shutdown();
timer.cancel();
return client.submit(new Clear());
}
@Override
public CompletableFuture<AtomixWorkQueue> open() {
return super.open().thenApply(result -> {
client.onStateChange(state -> {
......
......@@ -15,6 +15,14 @@
*/
package org.onosproject.store.primitives.resources.impl;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.copycat.Command;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
......@@ -25,14 +33,6 @@ import org.onosproject.store.service.WorkQueueStats;
import com.google.common.base.MoreObjects;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.CatalystSerializable;
import io.atomix.catalyst.serializer.SerializableTypeResolver;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.catalyst.serializer.SerializerRegistry;
import io.atomix.copycat.Command;
/**
* {@link AtomixWorkQueue} resource state machine operations.
*/
......@@ -207,6 +207,24 @@ public final class AtomixWorkQueueCommands {
}
}
@SuppressWarnings("serial")
public static class Clear implements Command<Void>, CatalystSerializable {
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.toString();
}
}
/**
* Work queue command type resolver.
*/
......@@ -219,6 +237,7 @@ public final class AtomixWorkQueueCommands {
registry.register(Add.class, -963);
registry.register(Complete.class, -964);
registry.register(Stats.class, -965);
registry.register(Clear.class, -966);
}
}
}
......
......@@ -16,6 +16,14 @@
package org.onosproject.store.primitives.resources.impl;
import static org.slf4j.LoggerFactory.getLogger;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
import java.util.ArrayList;
import java.util.Collection;
......@@ -31,6 +39,7 @@ import java.util.stream.IntStream;
import org.onlab.util.CountDownCompleter;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Add;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Complete;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Register;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueueCommands.Stats;
......@@ -47,15 +56,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.AtomicLongMap;
import io.atomix.copycat.server.Commit;
import io.atomix.copycat.server.Snapshottable;
import io.atomix.copycat.server.StateMachineExecutor;
import io.atomix.copycat.server.session.ServerSession;
import io.atomix.copycat.server.session.SessionListener;
import io.atomix.copycat.server.storage.snapshot.SnapshotReader;
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter;
import io.atomix.resource.ResourceStateMachine;
/**
* State machine for {@link AtomixWorkQueue} resource.
*/
......@@ -82,6 +82,7 @@ public class AtomixWorkQueueState extends ResourceStateMachine implements Sessi
executor.register(Add.class, (Consumer<Commit<Add>>) this::add);
executor.register(Take.class, this::take);
executor.register(Complete.class, (Consumer<Commit<Complete>>) this::complete);
executor.register(Clear.class, (Consumer<Commit<Clear>>) this::clear);
}
protected WorkQueueStats stats(Commit<? extends Stats> commit) {
......@@ -96,6 +97,17 @@ public class AtomixWorkQueueState extends ResourceStateMachine implements Sessi
}
}
protected void clear(Commit<? extends Clear> commit) {
unassignedTasks.forEach(TaskHolder::complete);
unassignedTasks.clear();
assignments.values().forEach(TaskAssignment::markComplete);
assignments.clear();
registeredWorkers.values().forEach(Commit::close);
registeredWorkers.clear();
activeTasksPerSession.clear();
totalCompleted.set(0);
}
protected void register(Commit<? extends Register> commit) {
long sessionId = commit.session().id();
if (registeredWorkers.putIfAbsent(sessionId, commit) != null) {
......@@ -172,7 +184,7 @@ public class AtomixWorkQueueState extends ResourceStateMachine implements Sessi
try {
commit.operation().taskIds().forEach(taskId -> {
TaskAssignment assignment = assignments.get(taskId);
if (assignment != null) {
if (assignment != null && assignment.sessionId() == sessionId) {
assignments.remove(taskId).markComplete();
// bookkeeping
totalCompleted.incrementAndGet();
......
......@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.resources.impl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.resource.ResourceType;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
......@@ -23,10 +30,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.atomix.Atomix;
import io.atomix.AtomixClient;
import io.atomix.resource.ResourceType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -36,10 +39,6 @@ import org.onosproject.store.service.WorkQueueStats;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for {@link AtomixWorkQueue}.
*/
......@@ -186,4 +185,54 @@ public class AtomixWorkQueueTest extends AtomixTestBase {
Uninterruptibles.awaitUninterruptibly(latch2, 500, TimeUnit.MILLISECONDS);
}
@Test
public void testDestroy() {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item = DEFAULT_PAYLOAD;
queue1.addOne(item).join();
Atomix atomix2 = createAtomixClient();
AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
byte[] task2 = DEFAULT_PAYLOAD;
queue2.addOne(task2).join();
WorkQueueStats stats = queue1.stats().join();
assertEquals(stats.totalPending(), 2);
assertEquals(stats.totalInProgress(), 0);
assertEquals(stats.totalCompleted(), 0);
queue2.destroy().join();
stats = queue1.stats().join();
assertEquals(stats.totalPending(), 0);
assertEquals(stats.totalInProgress(), 0);
assertEquals(stats.totalCompleted(), 0);
}
@Test
public void testCompleteAttemptWithIncorrectSession() {
String queueName = UUID.randomUUID().toString();
Atomix atomix1 = createAtomixClient();
AtomixWorkQueue queue1 = atomix1.getResource(queueName, AtomixWorkQueue.class).join();
byte[] item = DEFAULT_PAYLOAD;
queue1.addOne(item).join();
Task<byte[]> task = queue1.take().join();
String taskId = task.taskId();
// Create another client and get a handle to the same queue.
Atomix atomix2 = createAtomixClient();
AtomixWorkQueue queue2 = atomix2.getResource(queueName, AtomixWorkQueue.class).join();
// Attempt completing the task with new client and verify task is not completed
queue2.complete(taskId).join();
WorkQueueStats stats = queue1.stats().join();
assertEquals(stats.totalPending(), 0);
assertEquals(stats.totalInProgress(), 1);
assertEquals(stats.totalCompleted(), 0);
}
}
......