Jonathan Hart
Committed by Ray Milkey

Add persistence option to ECMap

Work towards ONOS-1337

Change-Id: I24e6a42e2f8856b363e79786829c51344797b81b
......@@ -166,6 +166,16 @@ public interface EventuallyConsistentMapBuilder<K, V> {
public EventuallyConsistentMapBuilder<K, V> withFasterConvergence();
/**
* Configure the map to persist data to disk.
* <p>
* The default behavior is no persistence
* </p>
*
* @return this EventuallyConsistentMapBuilder
*/
public EventuallyConsistentMapBuilder<K, V> withPersistence();
/**
* Builds an eventually consistent map based on the configuration options
* supplied to this builder.
*
......
......@@ -65,7 +65,7 @@
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
<version>1.0.6</version>
<version>1.0.7</version>
</dependency>
<dependency>
......
......@@ -51,6 +51,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
private long antiEntropyPeriod = 5;
private TimeUnit antiEntropyTimeUnit = TimeUnit.SECONDS;
private boolean convergeFaster = false;
private boolean persistent = false;
/**
* Creates a new eventually consistent map builder.
......@@ -131,6 +132,12 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
}
@Override
public EventuallyConsistentMapBuilder<K, V> withPersistence() {
persistent = true;
return this;
}
@Override
public EventuallyConsistentMap<K, V> build() {
checkNotNull(name, "name is a mandatory parameter");
checkNotNull(serializerBuilder, "serializerBuilder is a mandatory parameter");
......@@ -148,6 +155,7 @@ public class EventuallyConsistentMapBuilderImpl<K, V>
tombstonesDisabled,
antiEntropyPeriod,
antiEntropyTimeUnit,
convergeFaster);
convergeFaster,
persistent);
}
}
......
......@@ -93,7 +93,6 @@ public class EventuallyConsistentMapImpl<K, V>
= new CopyOnWriteArraySet<>();
private final ExecutorService executor;
private final ScheduledExecutorService backgroundExecutor;
private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
......@@ -116,6 +115,9 @@ public class EventuallyConsistentMapImpl<K, V>
private static final int LOAD_WINDOW = 2;
private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
private final boolean persistent;
private final PersistentStore<K, V> persistentStore;
/**
* Creates a new eventually consistent map shared amongst multiple instances.
* <p>
......@@ -140,9 +142,9 @@ public class EventuallyConsistentMapImpl<K, V>
* @param tombstonesDisabled true if this map should not maintain
* tombstones
* @param antiEntropyPeriod period that the anti-entropy task should run
* in seconds
* @param antiEntropyTimeUnit time unit for anti-entropy task scheduling
* @param antiEntropyTimeUnit time unit for anti-entropy period
* @param convergeFaster make anti-entropy try to converge faster
* @param persistent persist data to disk
*/
EventuallyConsistentMapImpl(String mapName,
ClusterService clusterService,
......@@ -156,7 +158,8 @@ public class EventuallyConsistentMapImpl<K, V>
boolean tombstonesDisabled,
long antiEntropyPeriod,
TimeUnit antiEntropyTimeUnit,
boolean convergeFaster) {
boolean convergeFaster,
boolean persistent) {
items = new ConcurrentHashMap<>();
removedItems = new ConcurrentHashMap<>();
senderPending = Maps.newConcurrentMap();
......@@ -195,6 +198,21 @@ public class EventuallyConsistentMapImpl<K, V>
newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
}
this.persistent = persistent;
if (this.persistent) {
String dataDirectory = System.getProperty("karaf.data", "./data");
String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
ExecutorService dbExecutor =
newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
persistentStore.readInto(items, removedItems);
} else {
this.persistentStore = null;
}
if (backgroundExecutor != null) {
this.backgroundExecutor = backgroundExecutor;
} else {
......@@ -232,6 +250,7 @@ public class EventuallyConsistentMapImpl<K, V>
.register(ArrayList.class)
.register(AntiEntropyAdvertisement.class)
.register(HashMap.class)
.register(Timestamped.class)
.build();
}
};
......@@ -321,6 +340,11 @@ public class EventuallyConsistentMapImpl<K, V>
if (success && removed != null) {
removedItems.remove(key, removed);
}
if (success && persistent) {
persistentStore.put(key, value, timestamp);
}
return success;
}
......@@ -357,24 +381,29 @@ public class EventuallyConsistentMapImpl<K, V>
// remove from items map
return null;
}
});
});
if (updated.isFalse()) {
return false;
}
boolean updatedTombstone = false;
if (!tombstonesDisabled) {
Timestamp removedTimestamp = removedItems.get(key);
if (removedTimestamp == null) {
return removedItems.putIfAbsent(key, timestamp) == null;
//Timestamp removed = removedItems.putIfAbsent(key, timestamp);
updatedTombstone = (removedItems.putIfAbsent(key, timestamp) == null);
} else if (timestamp.isNewerThan(removedTimestamp)) {
return removedItems.replace(key, removedTimestamp, timestamp);
} else {
return false;
updatedTombstone = removedItems.replace(key, removedTimestamp, timestamp);
}
}
return updated.booleanValue();
if (updated.booleanValue() && persistent) {
persistentStore.remove(key, timestamp);
}
return (!tombstonesDisabled && updatedTombstone) || updated.booleanValue();
}
@Override
......
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.ecmap;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.Hasher;
import org.mapdb.Serializer;
import org.onosproject.store.Timestamp;
import org.onosproject.store.impl.Timestamped;
import org.onosproject.store.serializers.KryoSerializer;
import java.io.File;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* MapDB based implementation of a persistent store.
*/
class MapDbPersistentStore<K, V> implements PersistentStore<K, V> {
private final ExecutorService executor;
private final KryoSerializer serializer;
private final DB database;
private final Map<byte[], byte[]> items;
private final Map<byte[], byte[]> tombstones;
/**
* Creates a new MapDB based persistent store.
*
* @param filename filename of the database on disk
* @param executor executor to use for tasks that write to the disk
* @param serializer serializer for keys and values
*/
MapDbPersistentStore(String filename, ExecutorService executor,
KryoSerializer serializer) {
this.executor = checkNotNull(executor);
this.serializer = checkNotNull(serializer);
File databaseFile = new File(filename);
database = DBMaker.newFileDB(databaseFile).make();
items = database.createHashMap("items")
.keySerializer(Serializer.BYTE_ARRAY)
.valueSerializer(Serializer.BYTE_ARRAY)
.hasher(Hasher.BYTE_ARRAY)
.makeOrGet();
tombstones = database.createHashMap("tombstones")
.keySerializer(Serializer.BYTE_ARRAY)
.valueSerializer(Serializer.BYTE_ARRAY)
.hasher(Hasher.BYTE_ARRAY)
.makeOrGet();
}
@Override
public void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones) {
this.items.forEach((keyBytes, valueBytes) ->
items.put(serializer.decode(keyBytes),
serializer.decode(valueBytes)));
this.tombstones.forEach((keyBytes, valueBytes) ->
tombstones.put(serializer.decode(keyBytes),
serializer.decode(valueBytes)));
}
@Override
public void put(K key, V value, Timestamp timestamp) {
executor.submit(() -> putInternal(key, value, timestamp));
}
private void putInternal(K key, V value, Timestamp timestamp) {
byte[] keyBytes = serializer.encode(key);
byte[] removedBytes = tombstones.get(keyBytes);
Timestamp removed = removedBytes == null ? null :
serializer.decode(removedBytes);
if (removed != null && removed.isNewerThan(timestamp)) {
return;
}
final MutableBoolean updated = new MutableBoolean(false);
items.compute(keyBytes, (k, existingBytes) -> {
Timestamped<V> existing = existingBytes == null ? null :
serializer.decode(existingBytes);
if (existing != null && existing.isNewerThan(timestamp)) {
updated.setFalse();
return existingBytes;
} else {
updated.setTrue();
return serializer.encode(new Timestamped<>(value, timestamp));
}
});
boolean success = updated.booleanValue();
if (success && removed != null) {
tombstones.remove(keyBytes, removedBytes);
}
database.commit();
}
@Override
public void remove(K key, Timestamp timestamp) {
executor.submit(() -> removeInternal(key, timestamp));
}
private void removeInternal(K key, Timestamp timestamp) {
byte[] keyBytes = serializer.encode(key);
final MutableBoolean updated = new MutableBoolean(false);
items.compute(keyBytes, (k, existingBytes) -> {
Timestamp existing = existingBytes == null ? null :
serializer.decode(existingBytes);
if (existing != null && existing.isNewerThan(timestamp)) {
updated.setFalse();
return existingBytes;
} else {
updated.setTrue();
// remove from items map
return null;
}
});
if (!updated.booleanValue()) {
return;
}
byte[] timestampBytes = serializer.encode(timestamp);
byte[] removedBytes = tombstones.get(keyBytes);
Timestamp removedTimestamp = removedBytes == null ? null :
serializer.decode(removedBytes);
if (removedTimestamp == null) {
tombstones.putIfAbsent(keyBytes, timestampBytes);
} else if (timestamp.isNewerThan(removedTimestamp)) {
tombstones.replace(keyBytes, removedBytes, timestampBytes);
}
database.commit();
}
}
/*
* Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.ecmap;
import org.onosproject.store.Timestamp;
import org.onosproject.store.impl.Timestamped;
import java.util.Map;
/**
* A persistent store for an eventually consistent map.
*/
interface PersistentStore<K, V> {
/**
* Read the contents of the disk into the given maps.
*
* @param items items map
* @param tombstones tombstones map
*/
void readInto(Map<K, Timestamped<V>> items, Map<K, Timestamp> tombstones);
/**
* Puts a new key,value pair into the map on disk.
*
* @param key the key
* @param value the value
* @param timestamp the timestamp of the update
*/
void put(K key, V value, Timestamp timestamp);
/**
* Removes a key from the map on disk.
*
* @param key the key
* @param timestamp the timestamp of the update
*/
void remove(K key, Timestamp timestamp);
}
......@@ -64,7 +64,7 @@
<bundle>mvn:com.typesafe/config/1.2.1</bundle>
<bundle>mvn:org.onosproject/onlab-thirdparty/@ONOS-VERSION</bundle>
<bundle>mvn:org.mapdb/mapdb/1.0.6</bundle>
<bundle>mvn:org.mapdb/mapdb/1.0.7</bundle>
</feature>
<feature name="onos-thirdparty-web" version="@FEATURE-VERSION"
......