Madan Jampani
Committed by Gerrit Code Review

StorageService API for creating AsyncDocumentTree primitive

Change-Id: Ib7c3f19beb7b26a5b69161cf972c3c64d0be94b3
......@@ -15,6 +15,7 @@
*/
package org.onosproject.vtnrsc.util;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.Topic;
import org.onosproject.store.service.ConsistentTreeMapBuilder;
import org.onosproject.store.service.WorkQueue;
......@@ -81,4 +82,9 @@ public class VtnStorageServiceAdapter implements StorageService {
public <T> Topic<T> getTopic(String name, Serializer serializer) {
return null;
}
@Override
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
return null;
}
}
......
......@@ -22,9 +22,10 @@ import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.WorkQueue;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
/**
* Interface for entity that can create instances of different distributed primitives.
......@@ -99,6 +100,16 @@ public interface DistributedPrimitiveCreator {
<E> WorkQueue<E> newWorkQueue(String name, Serializer serializer);
/**
* Creates a new {@code AsyncDocumentTree}.
*
* @param <V> document tree node value type
* @param name tree name
* @param serializer serializer
* @return document tree
*/
<V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer);
/**
* Returns the names of all created {@code AsyncConsistentMap} instances.
* @return set of {@code AsyncConsistentMap} names
*/
......
......@@ -114,12 +114,21 @@ public interface StorageService {
* @param <E> work element type
* @param name work queue name
* @param serializer serializer
*
* @return WorkQueue instance
*/
<E> WorkQueue<E> getWorkQueue(String name, Serializer serializer);
/**
* Returns an instance of {@code AsyncDocumentTree} with specified name.
*
* @param <V> tree node value type
* @param name document tree name
* @param serializer serializer
* @return AsyncDocumentTree instance
*/
<V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer);
/**
* Returns an instance of {@code Topic} with specified name.
*
* @param <T> topic message type
......
......@@ -64,7 +64,13 @@ public class StorageServiceAdapter implements StorageService {
return null;
}
@Override
public <V> ConsistentTreeMapBuilder<V> consistentTreeMapBuilder() {
return null;
}
@Override
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
return null;
}
}
......
......@@ -106,7 +106,6 @@ public final class CatalystSerializers {
serializer.register(DocumentTreeUpdateResult.class, factory);
serializer.register(DocumentTreeUpdateResult.Status.class, factory);
serializer.register(DocumentTreeEvent.class, factory);
serializer.register(DocumentTreeEvent.Type.class, factory);
serializer.register(Maps.immutableEntry("a", "b").getClass(), factory);
serializer.register(ImmutableList.of().getClass(), factory);
......
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.primitives.impl;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.DocumentTreeListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
/**
* Default implementation of {@link AsyncDocumentTree}.
* <p>
* This implementation delegates execution to a backing tree implemented on top of Atomix framework.
*
* @See AtomixDocumentTree
*
* @param <V> tree node value type.
*/
public class DefaultDistributedDocumentTree<V> implements AsyncDocumentTree<V> {
private final String name;
private final AsyncDocumentTree<byte[]> backingTree;
private final Serializer serializer;
private final Map<DocumentTreeListener<V>, InternalBackingDocumentTreeListener> listeners =
Maps.newIdentityHashMap();
DefaultDistributedDocumentTree(String name, AsyncDocumentTree<byte[]> backingTree, Serializer serializer) {
this.name = name;
this.backingTree = backingTree;
this.serializer = serializer;
}
@Override
public String name() {
return name;
}
@Override
public Type primitiveType() {
return backingTree.primitiveType();
}
@Override
public DocumentPath root() {
return backingTree.root();
}
@Override
public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
return backingTree.getChildren(path)
.thenApply(map -> Maps.transformValues(map, v -> v.map(serializer::decode)));
}
@Override
public CompletableFuture<Versioned<V>> get(DocumentPath path) {
return backingTree.get(path)
.thenApply(v -> v == null ? null : v.map(serializer::decode));
}
@Override
public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
return backingTree.set(path, serializer.encode(value))
.thenApply(v -> v == null ? null : v.map(serializer::decode));
}
@Override
public CompletableFuture<Boolean> create(DocumentPath path, V value) {
return backingTree.create(path, serializer.encode(value));
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
return backingTree.replace(path, serializer.encode(newValue), version);
}
@Override
public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
return backingTree.replace(path, serializer.encode(newValue), serializer.encode(currentValue));
}
@Override
public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
return backingTree.removeNode(path)
.thenApply(v -> v == null ? null : v.map(serializer::decode));
}
@Override
public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
synchronized (listeners) {
InternalBackingDocumentTreeListener backingListener =
listeners.computeIfAbsent(listener, k -> new InternalBackingDocumentTreeListener(listener));
return backingTree.addListener(path, backingListener);
}
}
@Override
public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
synchronized (listeners) {
InternalBackingDocumentTreeListener backingListener = listeners.remove(listener);
if (backingListener != null) {
return backingTree.removeListener(backingListener);
} else {
return CompletableFuture.completedFuture(null);
}
}
}
private class InternalBackingDocumentTreeListener implements DocumentTreeListener<byte[]> {
private final DocumentTreeListener<V> listener;
InternalBackingDocumentTreeListener(DocumentTreeListener<V> listener) {
this.listener = listener;
}
@Override
public void event(DocumentTreeEvent<byte[]> event) {
listener.event(new DocumentTreeEvent<V>(event.path(),
event.type(),
event.newValue().map(v -> v.map(serializer::decode)),
event.oldValue().map(v -> v.map(serializer::decode))));
}
}
}
......@@ -29,6 +29,7 @@ import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.WorkQueue;
......@@ -108,6 +109,11 @@ public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiv
}
@Override
public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
return getCreator(name).newAsyncDocumentTree(name, serializer);
}
@Override
public Set<String> getAsyncConsistentMapNames() {
return members.values()
.stream()
......
......@@ -43,6 +43,7 @@ import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMap;
......@@ -180,6 +181,12 @@ public class StorageManager implements StorageService, StorageAdminService {
}
@Override
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
checkPermission(STORAGE_WRITE);
return federatedPrimitiveCreator.newAsyncDocumentTree(name, serializer);
}
@Override
public List<MapInfo> getMapInfo() {
return listMapInfo(federatedPrimitiveCreator);
}
......
......@@ -41,6 +41,7 @@ import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMap;
import org.onosproject.store.primitives.resources.impl.AtomixCounter;
import org.onosproject.store.primitives.resources.impl.AtomixDocumentTree;
import org.onosproject.store.primitives.resources.impl.AtomixLeaderElector;
import org.onosproject.store.primitives.resources.impl.AtomixWorkQueue;
import org.onosproject.store.serializers.KryoNamespaces;
......@@ -49,6 +50,7 @@ import org.onosproject.store.service.AsyncAtomicValue;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncLeaderElector;
import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.PartitionClientInfo;
......@@ -191,6 +193,12 @@ public class StoragePartitionClient implements DistributedPrimitiveCreator, Mana
}
@Override
public <V> AsyncDocumentTree<V> newAsyncDocumentTree(String name, Serializer serializer) {
AtomixDocumentTree atomixDocumentTree = client.getResource(name, AtomixDocumentTree.class).join();
return new DefaultDistributedDocumentTree<>(name, atomixDocumentTree, serializer);
}
@Override
public AsyncLeaderElector newAsyncLeaderElector(String name) {
AtomixLeaderElector leaderElector = client.getResource(name, AtomixLeaderElector.class)
.thenCompose(AtomixLeaderElector::setupCache)
......
......@@ -65,7 +65,7 @@ public class AtomixDocumentTreeState
private final Logger log = getLogger(getClass());
private final Map<Long, Commit<? extends Listen>> listeners = new HashMap<>();
private final AtomicLong versionCounter = new AtomicLong(0);
private AtomicLong versionCounter = new AtomicLong(0);
private final DocumentTree<TreeNodeValue> docTree = new DefaultDocumentTree<>(versionCounter::incrementAndGet);
public AtomixDocumentTreeState(Properties properties) {
......@@ -79,7 +79,7 @@ public class AtomixDocumentTreeState
@Override
public void install(SnapshotReader reader) {
versionCounter.set(reader.readLong());
versionCounter = new AtomicLong(reader.readLong());
}
@Override
......@@ -101,8 +101,7 @@ public class AtomixDocumentTreeState
commit.close();
return;
}
commit.session()
.onStateChange(
commit.session().onStateChange(
state -> {
if (state == ServerSession.State.CLOSED
|| state == ServerSession.State.EXPIRED) {
......@@ -262,11 +261,10 @@ public class AtomixDocumentTreeState
result.created() ? Type.CREATED : result.newValue() == null ? Type.DELETED : Type.UPDATED,
Optional.ofNullable(result.newValue()),
Optional.ofNullable(result.oldValue()));
Object message = ImmutableList.of(event);
listeners.values().forEach(commit -> {
commit.session().publish(AtomixDocumentTree.CHANGE_SUBJECT, message);
System.out.println("Sent " + message + " to " + commit.session().id());
});
listeners.values()
.forEach(commit -> commit.session()
.publish(AtomixDocumentTree.CHANGE_SUBJECT,
ImmutableList.of(event)));
}
@Override
......
......@@ -320,10 +320,6 @@ public class AtomixDocumentTreeTest extends AtomixTestBase {
}
}
public boolean eventReceived() {
return !queue.isEmpty();
}
public DocumentTreeEvent<byte[]> event() throws InterruptedException {
return queue.take();
}
......
......@@ -15,6 +15,7 @@
*/
package org.onosproject.pcelabelstore.util;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
......@@ -83,4 +84,9 @@ public class StorageServiceAdapter implements StorageService {
// TODO Auto-generated method stub
return null;
}
@Override
public <V> AsyncDocumentTree<V> getDocumentTree(String name, Serializer serializer) {
return null;
}
}
......