Madan Jampani
Committed by Brian O'Connor

Notification support for Consistent datastructures (ConsitentMap and DistributedSet)

Change-Id: If74cdc411c79c42c7643420e6369cf656849bb6a
Showing 21 changed files with 585 additions and 93 deletions
......@@ -288,4 +288,19 @@ public interface AsyncConsistentMap<K, V> {
* @return optional updated value. Will be empty if update did not happen.
*/
CompletableFuture<Optional<Versioned<V>>> replaceAndGet(K key, long oldVersion, V newValue);
/**
* Registers the specified listener to be notified whenever the map is updated.
*
* @param listener listener to notify about map events
*/
void addListener(MapEventListener<K, V> listener);
/**
* Unregisters the specified listener such that it will no longer
* receive map change notifications.
*
* @param listener listener to unregister
*/
void removeListener(MapEventListener<K, V> listener);
}
......
......@@ -289,4 +289,19 @@ public interface ConsistentMap<K, V> {
* @return optional new value. Will be empty if replace did not happen
*/
Optional<Versioned<V>> replaceAndGet(K key, long oldVersion, V newValue);
/**
* Registers the specified listener to be notified whenever the map is updated.
*
* @param listener listener to notify about map events
*/
void addListener(MapEventListener<K, V> listener);
/**
* Unregisters the specified listener such that it will no longer
* receive map change notifications.
*
* @param listener listener to unregister
*/
void removeListener(MapEventListener<K, V> listener);
}
......
package org.onosproject.store.service;
import java.util.Set;
/**
* A distributed collection designed for holding unique elements.
*
* @param <E> set entry type
*/
public interface DistributedSet<E> extends Set<E> {
/**
* Registers the specified listener to be notified whenever
* the set is updated.
*
* @param listener listener to notify about set update events
*/
void addListener(SetEventListener<E> listener);
/**
* Unregisters the specified listener.
*
* @param listener listener to unregister.
*/
void removeListener(SetEventListener<E> listener);
}
......@@ -15,14 +15,12 @@
*/
package org.onosproject.store.service;
import java.util.Set;
/**
* Builder for distributed set.
*
* @param <E> type set elements.
*/
public interface SetBuilder<E> {
public interface DistributedSetBuilder<E> {
/**
* Sets the name of the set.
......@@ -34,9 +32,9 @@ public interface SetBuilder<E> {
* </p>
*
* @param name name of the set
* @return this SetBuilder
* @return this DistributedSetBuilder
*/
SetBuilder<E> withName(String name);
DistributedSetBuilder<E> withName(String name);
/**
* Sets a serializer that can be used to serialize
......@@ -48,18 +46,36 @@ public interface SetBuilder<E> {
* </p>
*
* @param serializer serializer
* @return this SetBuilder
* @return this DistributedSetBuilder
*/
SetBuilder<E> withSerializer(Serializer serializer);
DistributedSetBuilder<E> withSerializer(Serializer serializer);
/**
* Disables set updates.
* <p>
* Attempt to update the built set will throw {@code UnsupportedOperationException}.
*
* @return this SetBuilder
* @return this DistributedSetBuilder
*/
DistributedSetBuilder<E> withUpdatesDisabled();
/**
* Disables distribution of set entries across multiple database partitions.
* <p>
* When partitioning is disabled, the returned set will have a single partition
* that spans the entire cluster. Furthermore, the changes made to the set are
* ephemeral and do not survive a full cluster restart.
* </p>
* <p>
* Disabling partitions is more appropriate when the returned set is used for
* simple coordination activities and not for long term data persistence.
* </p>
* <p>
* Note: By default partitions are enabled and entries in the set are durable.
* </p>
* @return this DistributedSetBuilder
*/
SetBuilder<E> withUpdatesDisabled();
DistributedSetBuilder<E> withPartitionsDisabled();
/**
* Builds an set based on the configuration options
......@@ -68,5 +84,5 @@ public interface SetBuilder<E> {
* @return new set
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
Set<E> build();
DistributedSet<E> build();
}
......
package org.onosproject.store.service;
import java.util.Objects;
import com.google.common.base.MoreObjects;
/**
* Representation of a ConsistentMap update notification.
*
* @param <K> key type
* @param <V> value type
*/
public class MapEvent<K, V> {
/**
* MapEvent type.
*/
public enum Type {
/**
* Entry inserted into the map.
*/
INSERT,
/**
* Existing map entry updated.
*/
UPDATE,
/**
* Entry removed from map.
*/
REMOVE
}
private final String name;
private final Type type;
private final K key;
private final Versioned<V> value;
/**
* Creates a new event object.
*
* @param name map name
* @param type the type of the event
* @param key the key the event concerns
* @param value the value related to the key, or null for remove events
*/
public MapEvent(String name, Type type, K key, Versioned<V> value) {
this.name = name;
this.type = type;
this.key = key;
this.value = value;
}
/**
* Returns the map name.
*
* @return name of map
*/
public String name() {
return name;
}
/**
* Returns the type of the event.
*
* @return the type of the event
*/
public Type type() {
return type;
}
/**
* Returns the key this event concerns.
*
* @return the key
*/
public K key() {
return key;
}
/**
* Returns the value associated with this event. If type is REMOVE,
* this is the value that was removed. If type is INSERT/UPDATE, this is
* the new value.
*
* @return the value
*/
public Versioned<V> value() {
return value;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof MapEvent)) {
return false;
}
MapEvent<K, V> that = (MapEvent) o;
return Objects.equals(this.name, that.name) &&
Objects.equals(this.type, that.type) &&
Objects.equals(this.key, that.key) &&
Objects.equals(this.value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(type, key, value);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("name", name)
.add("type", type)
.add("key", key)
.add("value", value)
.toString();
}
}
package org.onosproject.store.service;
/**
* Listener to be notified about updates to a ConsitentMap.
*/
public interface MapEventListener<K, V> {
/**
* Reacts to the specified event.
*
* @param event the event
*/
void event(MapEvent<K, V> event);
}
......@@ -16,8 +16,13 @@
package org.onosproject.store.service;
import java.util.Arrays;
import java.util.List;
import org.onlab.util.KryoNamespace;
import com.google.common.collect.Lists;
/**
* Interface for serialization for store artifacts.
*/
......@@ -45,16 +50,32 @@ public interface Serializer {
* @return Serializer instance
*/
static Serializer using(KryoNamespace kryo) {
return using(Arrays.asList(kryo));
}
static Serializer using(List<KryoNamespace> namespaces, Class<?>... classes) {
KryoNamespace.Builder builder = new KryoNamespace.Builder();
namespaces.forEach(builder::register);
Lists.newArrayList(classes).forEach(builder::register);
builder.register(MapEvent.class, MapEvent.Type.class);
KryoNamespace namespace = builder.build();
return new Serializer() {
@Override
public <T> byte[] encode(T object) {
return kryo.serialize(object);
return namespace.serialize(object);
}
@Override
public <T> T decode(byte[] bytes) {
return kryo.deserialize(bytes);
return namespace.deserialize(bytes);
}
};
}
static Serializer forTypes(Class<?>... classes) {
return using(KryoNamespace.newBuilder()
.register(classes)
.register(MapEvent.class, MapEvent.Type.class)
.build());
}
}
......
package org.onosproject.store.service;
import java.util.Objects;
import com.google.common.base.MoreObjects;
/**
* Representation of a DistributedSet update notification.
*
* @param <E> element type
*/
public class SetEvent<E> {
/**
* SetEvent type.
*/
public enum Type {
/**
* Entry added to the set.
*/
ADD,
/**
* Entry removed from the set.
*/
REMOVE
}
private final String name;
private final Type type;
private final E entry;
/**
* Creates a new event object.
*
* @param name set name
* @param type the type of the event
* @param entry the entry the event concerns
*/
public SetEvent(String name, Type type, E entry) {
this.name = name;
this.type = type;
this.entry = entry;
}
/**
* Returns the set name.
*
* @return name of set
*/
public String name() {
return name;
}
/**
* Returns the type of the event.
*
* @return the type of the event
*/
public Type type() {
return type;
}
/**
* Returns the entry this event concerns.
*
* @return the entry
*/
public E entry() {
return entry;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof SetEvent)) {
return false;
}
SetEvent<E> that = (SetEvent) o;
return Objects.equals(this.name, that.name) &&
Objects.equals(this.type, that.type) &&
Objects.equals(this.entry, that.entry);
}
@Override
public int hashCode() {
return Objects.hash(name, type, entry);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("name", name)
.add("type", type)
.add("entry", entry)
.toString();
}
}
package org.onosproject.store.service;
/**
* Listener to be notified about updates to a DistributedSet.
*/
public interface SetEventListener<E> {
/**
* Reacts to the specified event.
*
* @param event the event
*/
void event(SetEvent<E> event);
}
......@@ -53,7 +53,7 @@ public interface StorageService {
* @param <E> set element type
* @return builder for an distributed set
*/
<E> SetBuilder<E> setBuilder();
<E> DistributedSetBuilder<E> setBuilder();
/**
* Creates a new AtomicCounterBuilder.
......
......@@ -42,19 +42,24 @@ import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import static org.onlab.util.Tools.groupedThreads;
import org.onosproject.cluster.ClusterService;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapInfo;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.SetBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Transaction;
......@@ -69,6 +74,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
......@@ -93,12 +99,16 @@ public class DatabaseManager implements StorageService, StorageAdminService {
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
private ClusterCoordinator coordinator;
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
protected PartitionedDatabase partitionedDatabase;
protected Database inMemoryDatabase;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
private ExecutorService eventDispatcher;
private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
......@@ -187,7 +197,10 @@ public class DatabaseManager implements StorageService, StorageAdminService {
Futures.getUnchecked(status);
transactionManager = new TransactionManager(partitionedDatabase);
transactionManager = new TransactionManager(partitionedDatabase, consistentMapBuilder());
eventDispatcher = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/manager", "map-event-dispatcher"));
log.info("Started");
}
......@@ -213,13 +226,14 @@ public class DatabaseManager implements StorageService, StorageAdminService {
log.info("Successfully closed databases.");
}
});
maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
eventDispatcher.shutdown();
log.info("Stopped");
}
@Override
public TransactionContextBuilder transactionContextBuilder() {
return new DefaultTransactionContextBuilder(
inMemoryDatabase, partitionedDatabase, transactionIdGenerator.getNewId());
return new DefaultTransactionContextBuilder(this, transactionIdGenerator.getNewId());
}
@Override
......@@ -296,12 +310,12 @@ public class DatabaseManager implements StorageService, StorageAdminService {
@Override
public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
return new DefaultConsistentMapBuilder<>(this);
}
@Override
public <E> SetBuilder<E> setBuilder() {
return new DefaultSetBuilder<>(partitionedDatabase);
public <E> DistributedSetBuilder<E> setBuilder() {
return new DefaultDistributedSetBuilder<>(this);
}
@Override
......@@ -370,4 +384,20 @@ public class DatabaseManager implements StorageService, StorageAdminService {
public void redriveTransactions() {
getTransactions().stream().forEach(transactionManager::execute);
}
protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
// TODO: Support different local instances of the same map.
if (!maps.add(map)) {
throw new IllegalStateException("Map by name " + map.name() + " already exists");
}
clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
map.serializer()::decode,
map::notifyLocalListeners,
eventDispatcher);
}
protected static MessageSubject mapUpdatesSubject(String mapName) {
return new MessageSubject(mapName + "-map-updates");
}
}
\ No newline at end of file
......
......@@ -17,6 +17,7 @@
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.*;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collection;
import java.util.Map;
......@@ -24,8 +25,10 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
......@@ -36,8 +39,11 @@ import org.onlab.util.HexString;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
......@@ -56,6 +62,11 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
private final Database database;
private final Serializer serializer;
private final boolean readOnly;
private final Consumer<MapEvent<K, V>> eventPublisher;
private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
private final Logger log = getLogger(getClass());
private static final String ERROR_NULL_KEY = "Key cannot be null";
private static final String ERROR_NULL_VALUE = "Null values are not allowed";
......@@ -77,11 +88,29 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
public DefaultAsyncConsistentMap(String name,
Database database,
Serializer serializer,
boolean readOnly) {
boolean readOnly,
Consumer<MapEvent<K, V>> eventPublisher) {
this.name = checkNotNull(name, "map name cannot be null");
this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
this.readOnly = readOnly;
this.eventPublisher = eventPublisher;
}
/**
* Returns this map name.
* @return map name
*/
public String name() {
return name;
}
/**
* Returns the serializer for map entries.
* @return map entry serializer
*/
public Serializer serializer() {
return serializer;
}
@Override
......@@ -139,6 +168,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(condition, "predicate function cannot be null");
checkNotNull(remappingFunction, "Remapping function cannot be null");
AtomicReference<MapEvent<K, V>> mapEvent = new AtomicReference<>();
return get(key).thenCompose(r1 -> {
V existingValue = r1 == null ? null : r1.value();
// if the condition evaluates to false, return existing value.
......@@ -160,6 +190,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
if (r1 != null) {
return remove(key, r1.version()).thenApply(result -> {
if (result) {
mapEvent.set(new MapEvent<>(name, MapEvent.Type.REMOVE, key, r1));
return null;
} else {
throw new ConsistentMapException.ConcurrentModification();
......@@ -174,6 +205,7 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
return replaceAndGet(key, r1.version(), computedValue.get())
.thenApply(v -> {
if (v.isPresent()) {
mapEvent.set(new MapEvent<>(name, MapEvent.Type.UPDATE, key, v.get()));
return v.get();
} else {
throw new ConsistentMapException.ConcurrentModification();
......@@ -184,12 +216,13 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
if (!result.isPresent()) {
throw new ConsistentMapException.ConcurrentModification();
} else {
mapEvent.set(new MapEvent<>(name, MapEvent.Type.INSERT, key, result.get()));
return result.get();
}
});
}
}
});
}).whenComplete((result, error) -> notifyListeners(mapEvent.get()));
}
@Override
......@@ -370,4 +403,35 @@ public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V>
throw new UnsupportedOperationException();
}
}
@Override
public void addListener(MapEventListener<K, V> listener) {
listeners.add(listener);
}
@Override
public void removeListener(MapEventListener<K, V> listener) {
listeners.remove(listener);
}
protected void notifyListeners(MapEvent<K, V> event) {
try {
if (event != null) {
notifyLocalListeners(event);
notifyRemoteListeners(event);
}
} catch (Exception e) {
log.warn("Failure notifying listeners about {}", event, e);
}
}
protected void notifyLocalListeners(MapEvent<K, V> event) {
listeners.forEach(listener -> listener.event(event));
}
protected void notifyRemoteListeners(MapEvent<K, V> event) {
if (eventPublisher != null) {
eventPublisher.accept(event);
}
}
}
\ No newline at end of file
......
......@@ -28,10 +28,9 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.Set;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
/**
......@@ -45,13 +44,14 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
private static final int OPERATION_TIMEOUT_MILLIS = 5000;
private final AsyncConsistentMap<K, V> asyncMap;
private final DefaultAsyncConsistentMap<K, V> asyncMap;
public DefaultConsistentMap(String name,
Database database,
Serializer serializer,
boolean readOnly) {
asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer, readOnly);
public String name() {
return asyncMap.name();
}
public DefaultConsistentMap(DefaultAsyncConsistentMap<K, V> asyncMap) {
this.asyncMap = asyncMap;
}
@Override
......@@ -190,4 +190,14 @@ public class DefaultConsistentMap<K, V> implements ConsistentMap<K, V> {
}
}
}
@Override
public void addListener(MapEventListener<K, V> listener) {
asyncMap.addListener(listener);
}
@Override
public void removeListener(MapEventListener<K, V> listener) {
asyncMap.addListener(listener);
}
}
\ No newline at end of file
......
......@@ -6,6 +6,7 @@ import static com.google.common.base.Preconditions.checkState;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.Serializer;
/**
......@@ -20,12 +21,10 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
private String name;
private boolean partitionsEnabled = true;
private boolean readOnly = false;
private final Database partitionedDatabase;
private final Database inMemoryDatabase;
private final DatabaseManager manager;
public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
this.inMemoryDatabase = inMemoryDatabase;
this.partitionedDatabase = partitionedDatabase;
public DefaultConsistentMapBuilder(DatabaseManager manager) {
this.manager = manager;
}
@Override
......@@ -60,21 +59,25 @@ public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K
@Override
public ConsistentMap<K, V> build() {
checkState(validInputs());
return new DefaultConsistentMap<>(
name,
partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
serializer,
readOnly);
return new DefaultConsistentMap<>(buildAndRegisterMap());
}
@Override
public AsyncConsistentMap<K, V> buildAsyncMap() {
return buildAndRegisterMap();
}
private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
checkState(validInputs());
return new DefaultAsyncConsistentMap<>(
DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
name,
partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
serializer,
readOnly);
readOnly,
event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
DatabaseManager.mapUpdatesSubject(name),
serializer::encode));
manager.registerMap(asyncMap);
return asyncMap;
}
}
\ No newline at end of file
......
......@@ -17,11 +17,17 @@ package org.onosproject.store.consistent.impl;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.SetEventListener;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
......@@ -29,12 +35,15 @@ import com.google.common.collect.Sets;
* @param <E> set element type
*/
public class DefaultDistributedSet<E> implements Set<E> {
public class DefaultDistributedSet<E> implements DistributedSet<E> {
private final String name;
private final ConsistentMap<E, Boolean> backingMap;
private final Map<SetEventListener<E>, MapEventListener<E, Boolean>> listenerMapping = Maps.newIdentityHashMap();
public DefaultDistributedSet(String name, Database database, Serializer serializer, boolean readOnly) {
backingMap = new DefaultConsistentMap<>(name, database, serializer, readOnly);
public DefaultDistributedSet(String name, ConsistentMap<E, Boolean> backingMap) {
this.name = name;
this.backingMap = backingMap;
}
@Override
......@@ -76,7 +85,7 @@ public class DefaultDistributedSet<E> implements Set<E> {
@SuppressWarnings("unchecked")
@Override
public boolean remove(Object o) {
return backingMap.remove((E) o, true);
return backingMap.remove((E) o) != null;
}
@Override
......@@ -119,4 +128,26 @@ public class DefaultDistributedSet<E> implements Set<E> {
public void clear() {
backingMap.clear();
}
@Override
public void addListener(SetEventListener<E> listener) {
MapEventListener<E, Boolean> mapEventListener = mapEvent -> {
if (mapEvent.type() == MapEvent.Type.INSERT) {
listener.event(new SetEvent<>(name, SetEvent.Type.ADD, mapEvent.key()));
} else if (mapEvent.type() == MapEvent.Type.REMOVE) {
listener.event(new SetEvent<>(name, SetEvent.Type.REMOVE, mapEvent.key()));
}
};
if (listenerMapping.putIfAbsent(listener, mapEventListener) == null) {
backingMap.addListener(mapEventListener);
}
}
@Override
public void removeListener(SetEventListener<E> listener) {
MapEventListener<E, Boolean> mapEventListener = listenerMapping.remove(listener);
if (mapEventListener != null) {
backingMap.removeListener(mapEventListener);
}
}
}
......
......@@ -15,58 +15,52 @@
*/
package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import java.util.Set;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DistributedSet;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.SetBuilder;
import org.onosproject.store.service.DistributedSetBuilder;
/**
* Default Set builder.
* Default distributed set builder.
*
* @param <E> type for set elements
*/
public class DefaultSetBuilder<E> implements SetBuilder<E> {
public class DefaultDistributedSetBuilder<E> implements DistributedSetBuilder<E> {
private Serializer serializer;
private String name;
private final Database database;
private boolean readOnly;
private ConsistentMapBuilder<E, Boolean> mapBuilder;
public DefaultSetBuilder(Database database) {
this.database = checkNotNull(database);
public DefaultDistributedSetBuilder(DatabaseManager manager) {
this.mapBuilder = manager.consistentMapBuilder();
}
@Override
public SetBuilder<E> withName(String name) {
checkArgument(name != null && !name.isEmpty());
public DistributedSetBuilder<E> withName(String name) {
mapBuilder.withName(name);
this.name = name;
return this;
}
@Override
public SetBuilder<E> withSerializer(Serializer serializer) {
checkArgument(serializer != null);
this.serializer = serializer;
public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
mapBuilder.withSerializer(serializer);
return this;
}
@Override
public SetBuilder<E> withUpdatesDisabled() {
readOnly = true;
public DistributedSetBuilder<E> withUpdatesDisabled() {
mapBuilder.withUpdatesDisabled();
return this;
}
private boolean validInputs() {
return name != null && serializer != null;
@Override
public DistributedSetBuilder<E> withPartitionsDisabled() {
mapBuilder.withPartitionsDisabled();
return this;
}
@Override
public Set<E> build() {
checkState(validInputs());
return new DefaultDistributedSet<>(name, database, serializer, readOnly);
public DistributedSet<E> build() {
return new DefaultDistributedSet<E>(name, mapBuilder.build());
}
}
......
......@@ -18,9 +18,11 @@ package org.onosproject.store.consistent.impl;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static com.google.common.base.Preconditions.*;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
......@@ -41,10 +43,14 @@ public class DefaultTransactionContext implements TransactionContext {
private boolean isOpen = false;
private final Database database;
private final long transactionId;
private final Supplier<ConsistentMapBuilder> mapBuilderSupplier;
public DefaultTransactionContext(Database database, long transactionId) {
this.database = checkNotNull(database);
public DefaultTransactionContext(long transactionId,
Database database,
Supplier<ConsistentMapBuilder> mapBuilderSupplier) {
this.transactionId = transactionId;
this.database = checkNotNull(database);
this.mapBuilderSupplier = checkNotNull(mapBuilderSupplier);
}
@Override
......@@ -72,7 +78,7 @@ public class DefaultTransactionContext implements TransactionContext {
checkNotNull(serializer);
return txMaps.computeIfAbsent(mapName, name -> new DefaultTransactionalMap<>(
name,
new DefaultConsistentMap<>(name, database, serializer, false),
mapBuilderSupplier.get().withName(name).withSerializer(serializer).build(),
this,
serializer));
}
......@@ -85,6 +91,7 @@ public class DefaultTransactionContext implements TransactionContext {
List<DatabaseUpdate> updates = Lists.newLinkedList();
txMaps.values()
.forEach(m -> { updates.addAll(m.prepareDatabaseUpdates()); });
// FIXME: Updates made via transactional context currently do not result in notifications. (ONOS-2097)
database.prepareAndCommit(new DefaultTransaction(transactionId, updates));
} catch (Exception e) {
abort();
......
......@@ -10,14 +10,11 @@ import org.onosproject.store.service.TransactionContextBuilder;
public class DefaultTransactionContextBuilder implements TransactionContextBuilder {
private boolean partitionsEnabled = true;
private final Database partitionedDatabase;
private final Database inMemoryDatabase;
private final DatabaseManager manager;
private final long transactionId;
public DefaultTransactionContextBuilder(
Database inMemoryDatabase, Database partitionedDatabase, long transactionId) {
this.partitionedDatabase = partitionedDatabase;
this.inMemoryDatabase = inMemoryDatabase;
public DefaultTransactionContextBuilder(DatabaseManager manager, long transactionId) {
this.manager = manager;
this.transactionId = transactionId;
}
......@@ -30,8 +27,9 @@ public class DefaultTransactionContextBuilder implements TransactionContextBuild
@Override
public TransactionContext build() {
return new DefaultTransactionContext(
partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
transactionId);
transactionId,
partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
() -> partitionsEnabled ? manager.consistentMapBuilder()
: manager.consistentMapBuilder().withPartitionsDisabled());
}
}
......
......@@ -50,6 +50,7 @@ public class PartitionedDatabase implements Database {
private final List<Database> partitions;
private final AtomicBoolean isOpen = new AtomicBoolean(false);
private static final String DB_NOT_OPEN = "Partitioned Database is not open";
private TransactionManager transactionManager;
public PartitionedDatabase(
String name,
......@@ -285,7 +286,10 @@ public class PartitionedDatabase implements Database {
subTransactions.entrySet().iterator().next();
return entry.getKey().prepareAndCommit(entry.getValue());
} else {
return new TransactionManager(this).execute(transaction);
if (transactionManager != null) {
throw new IllegalStateException("TransactionManager is not initialized");
}
return transactionManager.execute(transaction);
}
}
......@@ -387,4 +391,8 @@ public class PartitionedDatabase implements Database {
perPartitionUpdates.forEach((k, v) -> subTransactions.put(k, new DefaultTransaction(transaction.id(), v)));
return subTransactions;
}
protected void setTransactionManager(TransactionManager tranasactionManager) {
this.transactionManager = transactionManager;
}
}
......
......@@ -17,6 +17,7 @@ package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
......@@ -26,6 +27,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Transaction;
......@@ -49,7 +51,7 @@ public class TransactionManager {
.register(ImmutablePair.class)
.build();
private final Serializer serializer = Serializer.using(KRYO_NAMESPACE);
private final Serializer serializer = Serializer.using(Arrays.asList(KRYO_NAMESPACE));
private final Database database;
private final AsyncConsistentMap<Long, Transaction> transactions;
......@@ -58,9 +60,11 @@ public class TransactionManager {
*
* @param database database
*/
public TransactionManager(Database database) {
public TransactionManager(Database database, ConsistentMapBuilder<Long, Transaction> mapBuilder) {
this.database = checkNotNull(database, "database cannot be null");
this.transactions = new DefaultAsyncConsistentMap<>("onos-transactions", this.database, serializer, false);
this.transactions = mapBuilder.withName("onos-transactions")
.withSerializer(serializer)
.buildAsyncMap();
}
/**
......
......@@ -170,6 +170,8 @@ import org.onosproject.net.resource.link.MplsLabel;
import org.onosproject.net.resource.link.MplsLabelResourceAllocation;
import org.onosproject.net.resource.link.MplsLabelResourceRequest;
import org.onosproject.store.Timestamp;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.SetEvent;
import org.onosproject.store.service.Versioned;
import java.net.URI;
......@@ -408,6 +410,10 @@ public final class KryoNamespaces {
.register(new HostLocationSerializer(), HostLocation.class)
.register(new DefaultOutboundPacketSerializer(), DefaultOutboundPacket.class)
.register(Versioned.class)
.register(MapEvent.class)
.register(MapEvent.Type.class)
.register(SetEvent.class)
.register(SetEvent.Type.class)
.register(DefaultGroupId.class)
.register(Annotations.class)
.register(OmsPort.class)
......