Jonathan Hart

Initial implementation of EventuallyConsistentMap.

The map uses the gossip schemes to replicate data between instances. It seems
to work for basic add and remove use cases right now, no anti-entropy yet.

ONOS-844.

Change-Id: I7d05a7b532e40c95ab14e2c8911f18514bd0a8ca
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
import org.onosproject.store.Timestamp;
/**
* Clock service that can generate timestamps per object.
*/
public interface ClockService<T> {
/**
* Gets a new timestamp for the given object.
*
* @param object Object to get a timestamp for
* @return the new timestamp
*/
public Timestamp getTimestamp(T object);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
* A distributed, eventually consistent map.
*
* This map does not offer read after writes consistency. Operations are
* serialized via the timestamps issued by the clock service. If two updates
* are in conflict, the update with the more recent timestamp will endure.
*
* The interface is mostly similar to {@link java.util.Map} with some minor
* semantic changes and the addition of a listener framework (because the map
* can be mutated by clients on other instances, not only through the local Java
* API).
*
* Clients are expected to register an
* {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if they
* are interested in receiving notifications of update to the map.
*/
public interface EventuallyConsistentMap<K, V> {
/**
* Returns the number of key-value mappings in this map.
*
* @return number of key-value mappings
*/
public int size();
/**
* Returns true if this map is empty.
*
* @return true if this map is empty, otherwise false
*/
public boolean isEmpty();
/**
* Returns true if the map contains a mapping for the specified key.
*
* @param key the key to check if this map contains
* @return true if this map has a mapping for the key, otherwise false
*/
public boolean containsKey(K key);
/**
* Returns true if the map contains a mapping from any key to the specified
* value.
*
* @param value the value to check if this map has a mapping for
* @return true if this map has a mapping to this value, otherwise false
*/
public boolean containsValue(V value);
/**
* Returns the value mapped to the specified key.
*
* @param key the key to look up in this map
* @return the value mapped to the key, or null if no mapping is found
*/
public V get(K key);
/**
* Associates the specified value to the specified key in this map.
* <p>
* Note: this differs from the specification of {@link java.util.Map}
* because it does not return the previous value associated with the key.
* Clients are expected to register an
* {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if
* they are interested in receiving notification of updates to the map.
* </p>
*
* @param key the key to add a mapping for in this map
* @param value the value to associate with the key in this map
*/
public void put(K key, V value);
/**
* Removes the mapping associated with the specified key from the map.
* <p>
* Note: this differs from the specification of {@link java.util.Map}
* because it does not return the previous value associated with the key.
* Clients are expected to register an
* {@link org.onosproject.store.impl.EventuallyConsistentMapListener} if
* they are interested in receiving notification of updates to the map.
* </p>
*
* @param key the key to remove the mapping for
*/
public void remove(K key);
/**
* Adds mappings for all key-value pairs in the specified map to this map.
* <p>
* This will be more efficient in communication than calling individual put
* operations.
* </p>
*
* @param m a map of values to add to this map
*/
public void putAll(Map<? extends K, ? extends V> m);
/**
* Removes all mappings from this map.
*/
public void clear();
/**
* Returns a set of the keys in this map. Changes to the set are not
* reflected back to the map.
*
* @return set of keys in the map
*/
public Set<K> keySet();
/**
* Returns a collections of values in this map. Changes to the collection
* are not reflected back to the map.
*
* @return collection of values in the map
*/
public Collection<V> values();
/**
* Returns a set of mappings contained in this map. Changes to the set are
* not reflected back to the map.
*
* @return set of key-value mappings in this map
*/
public Set<Map.Entry<K, V>> entrySet();
/**
* Adds the specified listener to the map which will be notified whenever
* the mappings in the map are changed.
*
* @param listener listener to register for events
*/
public void addListener(EventuallyConsistentMapListener listener);
/**
* Removes the specified listener from the map such that it will no longer
* receive change notifications.
*
* @param listener listener to deregister for events
*/
public void removeListener(EventuallyConsistentMapListener listener);
/**
* Shuts down the map and breaks communication between different instances.
* This allows the map objects to be cleaned up and garbage collected.
* Calls to any methods on the map subsequent to calling destroy() will
* throw a {@link java.lang.RuntimeException}.
*/
public void destroy();
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
/**
* Event object signalling that the map was modified.
*/
public class EventuallyConsistentMapEvent<K, V> {
public enum Type {
PUT,
REMOVE
}
private final Type type;
private final K key;
private final V value;
/**
* Creates a new event object.
*
* @param type the type of the event
* @param key the key the event concerns
* @param value the value related to the key, or null for remove events
*/
public EventuallyConsistentMapEvent(Type type, K key, V value) {
this.type = type;
this.key = key;
this.value = value;
}
/**
* Returns the type of the event.
*
* @return the type of the event
*/
public Type type() {
return type;
}
/**
* Returns the key this event concerns.
*
* @return the key
*/
public K key() {
return key;
}
/**
* Returns the value associated with this event.
*
* @return the value, or null if the event was REMOVE
*/
public V value() {
return value;
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
import com.google.common.base.MoreObjects;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.Timestamp;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.minPriority;
import static org.onlab.util.Tools.namedThreads;
/**
* Distributed Map implementation which uses optimistic replication and gossip
* based techniques to provide an eventually consistent data store.
*/
public class EventuallyConsistentMapImpl<K, V>
implements EventuallyConsistentMap<K, V> {
private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
private final Map<K, Timestamped<V>> items;
private final Map<K, Timestamp> removedItems;
private final String mapName;
private final ClusterService clusterService;
private final ClusterCommunicationService clusterCommunicator;
private final KryoSerializer serializer;
private final ClockService<K> clockService;
private final MessageSubject updateMessageSubject;
private final MessageSubject removeMessageSubject;
private final Set<EventuallyConsistentMapListener> listeners
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
private volatile boolean destroyed = false;
private static final String ERROR_DESTROYED = " is already destroyed";
// TODO: Make these anti-entropy params configurable
private long initialDelaySec = 5;
private long periodSec = 5;
/**
* Creates a new eventually consistent map shared amongst multiple instances.
*
* Each map is identified by a string map name. EventuallyConsistentMapImpl
* objects in different JVMs that use the same map name will form a
* distributed map across JVMs (provided the cluster service is aware of
* both nodes).
*
* The client is expected to provide an
* {@link org.onlab.util.KryoNamespace.Builder} with which all classes that
* will be stored in this map have been registered (including referenced
* classes). This serializer will be used to serialize both K and V for
* inter-node notifications.
*
* The client must provide an {@link org.onosproject.store.impl.ClockService}
* which can generate timestamps for a given key. The clock service is free
* to generate timestamps however it wishes, however these timestamps will
* be used to serialize updates to the map so they must be strict enough
* to ensure updates are properly ordered for the use case (i.e. in some
* cases wallclock time will suffice, whereas in other cases logical time
* will be necessary).
*
* @param mapName a String identifier for the map.
* @param clusterService the cluster service
* @param clusterCommunicator the cluster communications service
* @param serializerBuilder a Kryo namespace builder that can serialize
* both K and V
* @param clockService a clock service able to generate timestamps
* for K
*/
public EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
ClusterCommunicationService clusterCommunicator,
KryoNamespace.Builder serializerBuilder,
ClockService<K> clockService) {
this.mapName = checkNotNull(mapName);
this.clusterService = checkNotNull(clusterService);
this.clusterCommunicator = checkNotNull(clusterCommunicator);
serializer = createSerializer(checkNotNull(serializerBuilder));
this.clockService = checkNotNull(clockService);
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
executor = Executors
.newCachedThreadPool(namedThreads("onos-ecm-" + mapName + "-fg-%d"));
backgroundExecutor =
newSingleThreadScheduledExecutor(minPriority(
namedThreads("onos-ecm-" + mapName + "-bg-%d")));
updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
clusterCommunicator.addSubscriber(updateMessageSubject,
new InternalPutEventListener());
removeMessageSubject = new MessageSubject("ecm-" + mapName + "-remove");
clusterCommunicator.addSubscriber(removeMessageSubject,
new InternalRemoveEventListener());
}
private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
return new KryoSerializer() {
@Override
protected void setupKryoPool() {
// Add the map's internal helper classes to the user-supplied serializer
serializerPool = builder
.register(WallClockTimestamp.class)
.register(PutEntry.class)
.register(ArrayList.class)
.register(InternalPutEvent.class)
.register(InternalRemoveEvent.class)
.build();
// TODO anti-entropy classes
}
};
}
@Override
public int size() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.size();
}
@Override
public boolean isEmpty() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.isEmpty();
}
@Override
public boolean containsKey(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.containsKey(key);
}
@Override
public boolean containsValue(V value) {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.anyMatch(timestamped -> timestamped.value().equals(value));
}
@Override
public V get(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
Timestamped<V> value = items.get(key);
if (value != null) {
return value.value();
}
return null;
}
@Override
public void put(K key, V value) {
checkState(destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (putInternal(key, value, timestamp)) {
notifyPeers(new InternalPutEvent<>(key, value, timestamp));
EventuallyConsistentMapEvent<K, V> externalEvent
= new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key, value);
notifyListeners(externalEvent);
}
}
private boolean putInternal(K key, V value, Timestamp timestamp) {
synchronized (this) {
Timestamp removed = removedItems.get(key);
if (removed != null && removed.compareTo(timestamp) > 0) {
return false;
}
Timestamped<V> existing = items.get(key);
if (existing != null && existing.isNewer(timestamp)) {
return false;
} else {
items.put(key, new Timestamped<>(value, timestamp));
removedItems.remove(key);
return true;
}
}
}
@Override
public void remove(K key) {
checkState(destroyed, mapName + ERROR_DESTROYED);
Timestamp timestamp = clockService.getTimestamp(key);
if (removeInternal(key, timestamp)) {
notifyPeers(new InternalRemoveEvent<>(key, timestamp));
EventuallyConsistentMapEvent<K, V> externalEvent
= new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, key, null);
notifyListeners(externalEvent);
}
}
private boolean removeInternal(K key, Timestamp timestamp) {
synchronized (this) {
if (items.get(key) != null && items.get(key).isNewer(timestamp)) {
return false;
}
items.remove(key);
removedItems.put(key, timestamp);
return true;
}
}
@Override
public void putAll(Map<? extends K, ? extends V> m) {
checkState(destroyed, mapName + ERROR_DESTROYED);
List<PutEntry<K, V>> updates = new ArrayList<>(m.size());
for (Map.Entry<? extends K, ? extends V> entry : m.entrySet()) {
K key = entry.getKey();
V value = entry.getValue();
Timestamp timestamp = clockService.getTimestamp(entry.getKey());
if (putInternal(key, value, timestamp)) {
updates.add(new PutEntry<>(key, value, timestamp));
}
}
notifyPeers(new InternalPutEvent<>(updates));
for (PutEntry<K, V> entry : updates) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, entry.key(), entry.value());
notifyListeners(externalEvent);
}
}
@Override
public void clear() {
checkState(destroyed, mapName + ERROR_DESTROYED);
List<RemoveEntry<K>> removed = new ArrayList<>(items.size());
for (K key : items.keySet()) {
Timestamp timestamp = clockService.getTimestamp(key);
if (removeInternal(key, timestamp)) {
removed.add(new RemoveEntry<>(key, timestamp));
}
}
notifyPeers(new InternalRemoveEvent<>(removed));
for (RemoveEntry<K> entry : removed) {
EventuallyConsistentMapEvent<K, V> externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.REMOVE, entry.key(), null);
notifyListeners(externalEvent);
}
}
@Override
public Set<K> keySet() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.keySet();
}
@Override
public Collection<V> values() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.values().stream()
.map(Timestamped::value)
.collect(Collectors.toList());
}
@Override
public Set<Map.Entry<K, V>> entrySet() {
checkState(destroyed, mapName + ERROR_DESTROYED);
return items.entrySet().stream()
.map(e -> new Entry(e.getKey(), e.getValue().value()))
.collect(Collectors.toSet());
}
@Override
public void addListener(EventuallyConsistentMapListener listener) {
checkState(destroyed, mapName + ERROR_DESTROYED);
listeners.add(checkNotNull(listener));
}
@Override
public void removeListener(EventuallyConsistentMapListener listener) {
checkState(destroyed, mapName + ERROR_DESTROYED);
listeners.remove(checkNotNull(listener));
}
@Override
public void destroy() {
destroyed = true;
executor.shutdown();
backgroundExecutor.shutdown();
clusterCommunicator.removeSubscriber(updateMessageSubject);
clusterCommunicator.removeSubscriber(removeMessageSubject);
}
private void notifyListeners(EventuallyConsistentMapEvent event) {
for (EventuallyConsistentMapListener listener : listeners) {
listener.event(event);
}
}
private void notifyPeers(InternalPutEvent event) {
try {
log.debug("sending put {}", event);
broadcastMessage(updateMessageSubject, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
}
private void notifyPeers(InternalRemoveEvent event) {
try {
broadcastMessage(removeMessageSubject, event);
} catch (IOException e) {
// TODO this won't happen; remove from API
log.debug("IOException broadcasting update", e);
}
}
private void broadcastMessage(MessageSubject subject, Object event) throws
IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
clusterCommunicator.broadcast(message);
}
private void unicastMessage(NodeId peer,
MessageSubject subject,
Object event) throws IOException {
ClusterMessage message = new ClusterMessage(
clusterService.getLocalNode().id(),
subject,
serializer.encode(event));
clusterCommunicator.unicast(message, peer);
}
private final class Entry implements Map.Entry<K, V> {
private final K key;
private final V value;
public Entry(K key, V value) {
this.key = key;
this.value = value;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
}
private final class InternalPutEventListener implements
ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received put event from peer: {}", message.sender());
InternalPutEvent<K, V> event = serializer.decode(message.payload());
executor.submit(() -> {
try {
for (PutEntry<K, V> entry : event.entries()) {
K key = entry.key();
V value = entry.value();
Timestamp timestamp = entry.timestamp();
if (putInternal(key, value, timestamp)) {
EventuallyConsistentMapEvent externalEvent =
new EventuallyConsistentMapEvent<>(
EventuallyConsistentMapEvent.Type.PUT, key,
value);
notifyListeners(externalEvent);
}
}
} catch (Exception e) {
log.warn("Exception thrown handling put", e);
}
});
}
}
private final class InternalRemoveEventListener implements
ClusterMessageHandler {
@Override
public void handle(ClusterMessage message) {
log.debug("Received remove event from peer: {}", message.sender());
InternalRemoveEvent<K> event = serializer.decode(message.payload());
executor.submit(() -> {
try {
for (RemoveEntry<K> entry : event.entries()) {
K key = entry.key();
Timestamp timestamp = entry.timestamp();
if (removeInternal(key, timestamp)) {
EventuallyConsistentMapEvent externalEvent = new EventuallyConsistentMapEvent<K, V>(
EventuallyConsistentMapEvent.Type.REMOVE,
key, null);
notifyListeners(externalEvent);
}
}
} catch (Exception e) {
log.warn("Exception thrown handling remove", e);
}
});
}
}
private static final class InternalPutEvent<K, V> {
private final List<PutEntry<K, V>> entries;
public InternalPutEvent(K key, V value, Timestamp timestamp) {
entries = Collections
.singletonList(new PutEntry<>(key, value, timestamp));
}
public InternalPutEvent(List<PutEntry<K, V>> entries) {
this.entries = checkNotNull(entries);
}
// Needed for serialization.
@SuppressWarnings("unused")
private InternalPutEvent() {
entries = null;
}
public List<PutEntry<K, V>> entries() {
return entries;
}
}
private static final class PutEntry<K, V> {
private final K key;
private final V value;
private final Timestamp timestamp;
public PutEntry(K key, V value, Timestamp timestamp) {
this.key = checkNotNull(key);
this.value = checkNotNull(value);
this.timestamp = checkNotNull(timestamp);
}
// Needed for serialization.
@SuppressWarnings("unused")
private PutEntry() {
this.key = null;
this.value = null;
this.timestamp = null;
}
public K key() {
return key;
}
public V value() {
return value;
}
public Timestamp timestamp() {
return timestamp;
}
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("key", key)
.add("value", value)
.add("timestamp", timestamp)
.toString();
}
}
private static final class InternalRemoveEvent<K> {
private final List<RemoveEntry<K>> entries;
public InternalRemoveEvent(K key, Timestamp timestamp) {
entries = Collections.singletonList(
new RemoveEntry<>(key, timestamp));
}
public InternalRemoveEvent(List<RemoveEntry<K>> entries) {
this.entries = checkNotNull(entries);
}
// Needed for serialization.
@SuppressWarnings("unused")
private InternalRemoveEvent() {
entries = null;
}
public List<RemoveEntry<K>> entries() {
return entries;
}
}
private static final class RemoveEntry<K> {
private final K key;
private final Timestamp timestamp;
public RemoveEntry(K key, Timestamp timestamp) {
this.key = checkNotNull(key);
this.timestamp = checkNotNull(timestamp);
}
// Needed for serialization.
@SuppressWarnings("unused")
private RemoveEntry() {
this.key = null;
this.timestamp = null;
}
public K key() {
return key;
}
public Timestamp timestamp() {
return timestamp;
}
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
/**
* Listener interested in receiving modification events for an
* EventuallyConsistentMap.
*/
public interface EventuallyConsistentMapListener {
/**
* Reacts to the specified event.
*
* @param event the event
*/
public void event(EventuallyConsistentMapEvent event);
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.impl;
import org.onosproject.store.Timestamp;
/**
* A clock service which hands out wallclock-based timestamps.
*/
public class WallclockClockManager<T> implements ClockService<T> {
@Override
public Timestamp getTimestamp(T object) {
return new WallClockTimestamp();
}
}