Madan Jampani
Committed by Ray Milkey

Added a AtomicValue distributed primitive.

Change-Id: I00ff165cbd9c6e4f2610af9877ff262527b7b048
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* Distributed version of java.util.concurrent.atomic.AtomicReference.
*
* @param <V> value type
*/
public interface AtomicValue<V> {
/**
* Atomically sets the value to the given updated value if the current value is equal to the expected value.
* <p>
* IMPORTANT: Equality is based on the equality of the serialized byte[] representations.
* <p>
* @param expect the expected value
* @param update the new value
* @return true if successful. false return indicates that the actual value was not equal to the expected value.
*/
boolean compareAndSet(V expect, V update);
/**
* Gets the current value.
* @return current value
*/
V get();
/**
* Atomically sets to the given value and returns the old value.
* @param value the new value
* @return previous value
*/
V getAndSet(V value);
/**
* Sets to the given value.
* @param value new value
*/
void set(V value);
/**
* Registers the specified listener to be notified whenever the atomic value is updated.
*
* @param listener listener to notify about events
*/
void addListener(AtomicValueEventListener<V> listener);
/**
* Unregisters the specified listener such that it will no longer
* receive atomic value update notifications.
*
* @param listener listener to unregister
*/
void removeListener(AtomicValueEventListener<V> listener);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* Builder for constructing new AtomicValue instances.
*
* @param <V> atomic value type
*/
public interface AtomicValueBuilder<V> {
/**
* Sets the name for the atomic value.
* <p>
* Each atomic value is identified by a unique name.
* </p>
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param name name of the atomic value
* @return this AtomicValueBuilder for method chaining
*/
AtomicValueBuilder<V> withName(String name);
/**
* Sets a serializer that can be used to serialize the value.
* <p>
* Note: This is a mandatory parameter.
* </p>
*
* @param serializer serializer
* @return this AtomicValueBuilder for method chaining
*/
AtomicValueBuilder<V> withSerializer(Serializer serializer);
/**
* Creates this atomic value on the partition that spans the entire cluster.
* <p>
* When partitioning is disabled, the value state will be
* ephemeral and does not survive a full cluster restart.
* </p>
* <p>
* Note: By default partitions are enabled.
* </p>
* @return this AtomicValueBuilder for method chaining
*/
AtomicValueBuilder<V> withPartitionsDisabled();
/**
* Builds a AtomicValue based on the configuration options
* supplied to this builder.
*
* @return new AtomicValue
* @throws java.lang.RuntimeException if a mandatory parameter is missing
*/
AtomicValue<V> build();
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
import java.util.Objects;
import com.google.common.base.MoreObjects;
/**
* Representation of a AtomicValue update notification.
*
* @param <V> atomic value type
*/
public class AtomicValueEvent<V> {
/**
* AtomicValueEvent type.
*/
public enum Type {
/**
* Value was updated.
*/
UPDATE,
}
private final String name;
private final Type type;
private final V value;
/**
* Creates a new event object.
*
* @param name AtomicValue name
* @param type the type of the event
* @param value the new value
*/
public AtomicValueEvent(String name, Type type, V value) {
this.name = name;
this.type = type;
this.value = value;
}
/**
* Returns the AtomicValue name.
*
* @return name of atomic value
*/
public String name() {
return name;
}
/**
* Returns the type of the event.
*
* @return the type of the event
*/
public Type type() {
return type;
}
/**
* Returns the new updated value.
*
* @return the value
*/
public V value() {
return value;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof AtomicValueEvent)) {
return false;
}
AtomicValueEvent<V> that = (AtomicValueEvent) o;
return Objects.equals(this.name, that.name) &&
Objects.equals(this.type, that.type) &&
Objects.equals(this.value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(name, type, value);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("name", name)
.add("type", type)
.add("value", value)
.toString();
}
}
\ No newline at end of file
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.service;
/**
* Listener to be notified about updates to a AtomicValue.
*/
public interface AtomicValueEventListener<V> {
/**
* Reacts to the specified event.
*
* @param event the event
*/
void event(AtomicValueEvent<V> event);
}
......@@ -71,6 +71,14 @@ public interface StorageService {
AtomicCounterBuilder atomicCounterBuilder();
/**
* Creates a new AtomicValueBuilder.
*
* @param <V> atomic value type
* @return atomic value builder
*/
<V> AtomicValueBuilder<V> atomicValueBuilder();
/**
* Creates a new transaction context builder.
*
* @return a builder for a transaction context.
......
......@@ -63,6 +63,7 @@ import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.DistributedQueueBuilder;
......@@ -383,6 +384,11 @@ public class DatabaseManager implements StorageService, StorageAdminService {
}
@Override
public <V> AtomicValueBuilder<V> atomicValueBuilder() {
return new DefaultAtomicValueBuilder<>(this);
}
@Override
public List<MapInfo> getMapInfo() {
List<MapInfo> maps = Lists.newArrayList();
maps.addAll(getMapInfo(inMemoryDatabase));
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueEvent;
import org.onosproject.store.service.AtomicValueEventListener;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.Versioned;
/**
* Default implementation of AtomicValue.
*
* @param <V> value type
*/
public class DefaultAtomicValue<V> implements AtomicValue<V> {
private final Set<AtomicValueEventListener<V>> listeners = new CopyOnWriteArraySet<>();
private final ConsistentMap<String, byte[]> valueMap;
private final String name;
private final Serializer serializer;
private final MapEventListener<String, byte[]> mapEventListener = new InternalMapEventListener();
public DefaultAtomicValue(ConsistentMap<String, byte[]> valueMap,
String name,
Serializer serializer) {
this.valueMap = valueMap;
this.name = name;
this.serializer = serializer;
}
@Override
public boolean compareAndSet(V expect, V update) {
if (expect == null) {
if (update == null) {
return true;
}
return valueMap.putIfAbsent(name, serializer.encode(update)) == null;
} else {
if (update == null) {
return valueMap.remove(name, serializer.encode(expect));
}
return valueMap.replace(name, serializer.encode(expect), serializer.encode(update));
}
}
@Override
public V get() {
Versioned<byte[]> rawValue = valueMap.get(name);
return rawValue == null ? null : serializer.decode(rawValue.value());
}
@Override
public V getAndSet(V value) {
Versioned<byte[]> previousValue = value == null ?
valueMap.remove(name) : valueMap.put(name, serializer.encode(value));
return previousValue == null ? null : serializer.decode(previousValue.value());
}
@Override
public void set(V value) {
getAndSet(value);
}
@Override
public void addListener(AtomicValueEventListener<V> listener) {
synchronized (listeners) {
if (listeners.add(listener)) {
if (listeners.size() == 1) {
valueMap.addListener(mapEventListener);
}
}
}
}
@Override
public void removeListener(AtomicValueEventListener<V> listener) {
synchronized (listeners) {
if (listeners.remove(listener)) {
if (listeners.size() == 0) {
valueMap.removeListener(mapEventListener);
}
}
}
}
private class InternalMapEventListener implements MapEventListener<String, byte[]> {
@Override
public void event(MapEvent<String, byte[]> mapEvent) {
V newValue = mapEvent.type() == MapEvent.Type.REMOVE ? null : serializer.decode(mapEvent.value().value());
AtomicValueEvent<V> atomicValueEvent = new AtomicValueEvent<>(name, AtomicValueEvent.Type.UPDATE, newValue);
listeners.forEach(l -> l.event(atomicValueEvent));
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.consistent.impl;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AtomicValue;
import org.onosproject.store.service.AtomicValueBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
/**
* Default implementation of AtomicValueBuilder.
*
* @param <V> value type
*/
public class DefaultAtomicValueBuilder<V> implements AtomicValueBuilder<V> {
private Serializer serializer;
private String name;
private ConsistentMapBuilder<String, byte[]> mapBuilder;
public DefaultAtomicValueBuilder(DatabaseManager manager) {
mapBuilder = manager.<String, byte[]>consistentMapBuilder()
.withName("onos-atomic-values")
.withSerializer(Serializer.using(KryoNamespaces.BASIC));
}
@Override
public AtomicValueBuilder<V> withName(String name) {
this.name = name;
return this;
}
@Override
public AtomicValueBuilder<V> withSerializer(Serializer serializer) {
this.serializer = serializer;
return this;
}
@Override
public AtomicValueBuilder<V> withPartitionsDisabled() {
mapBuilder.withPartitionsDisabled();
return this;
}
@Override
public AtomicValue<V> build() {
return new DefaultAtomicValue<>(mapBuilder.build(), name, serializer);
}
}
\ No newline at end of file