Madan Jampani

1. Refactored ConsistentMap and StorageServive (renamed from DatabaseService) to api bundle.

2. Misc bug fixes uncovered during testing

Change-Id: I1219c5264831bcfa93565f764511f89de35a949d
Showing 15 changed files with 189 additions and 36 deletions
package org.onosproject.store.consistent.impl;
package org.onosproject.store.service;
import java.util.Collection;
import java.util.List;
......
package org.onosproject.store.consistent.impl;
package org.onosproject.store.service;
/**
* Top level exception for ConsistentMap failures.
......
package org.onosproject.store.service;
/**
* Interface for serialization for store artifacts.
*/
public interface Serializer {
/**
* Serialize the specified object.
* @param object object to serialize.
* @return serialized bytes.
* @param <T> encoded type
*/
<T> byte[] encode(T object);
/**
* Deserialize the specified bytes.
* @param bytes byte array to deserialize.
* @return deserialized object.
* @param <T> decoded type
*/
<T> T decode(byte[] bytes);
}
\ No newline at end of file
package org.onosproject.store.consistent.impl;
import org.onosproject.store.serializers.StoreSerializer;
package org.onosproject.store.service;
/**
* Database service.
* Storage service.
* <p>
* This service provides operations for creating key-value stores.
* One can chose to create key-value stores with varying properties such
* as strongly consistent vs eventually consistent, durable vs volatile.
* <p>
* Various store implementations should leverage the data structures provided
* by this service
*/
public interface DatabaseService {
public interface StorageService {
/**
* Creates a ConsistentMap.
*
* @param <K> Key type
* @param <V> value type
* @param name map name
* @param serializer serializer to use for serializing keys and values.
* @return consistent map.
* @param <K> key type
* @param <V> value type
*/
<K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
<K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer);
// TODO: add API for creating Eventually Consistent Map.
}
\ No newline at end of file
......
package org.onosproject.store.consistent.impl;
package org.onosproject.store.service;
import static com.google.common.base.Preconditions.*;
......
package org.onosproject.store.consistent.impl;
package org.onosproject.store.service;
import com.google.common.base.MoreObjects;
......
......@@ -2,7 +2,6 @@ package org.onosproject.store.consistent.impl;
import static com.google.common.base.Preconditions.*;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
......@@ -15,8 +14,13 @@ import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
......@@ -33,7 +37,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
private final String name;
private final DatabaseProxy<String, byte[]> proxy;
private final StoreSerializer serializer;
private final Serializer serializer;
private static final int OPERATION_TIMEOUT_MILLIS = 1000;
private static final String ERROR_NULL_KEY = "Key cannot be null";
......@@ -55,7 +59,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
ConsistentMapImpl(String name,
DatabaseProxy<String, byte[]> proxy,
StoreSerializer serializer) {
Serializer serializer) {
this.name = checkNotNull(name, "map name cannot be null");
this.proxy = checkNotNull(proxy, "database proxy cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
......@@ -87,14 +91,15 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
public Versioned<V> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
return new Versioned<>(serializer.decode(value.value()), value.version());
return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
}
@Override
public Versioned<V> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
Versioned<byte[]> previousValue =
complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
return (previousValue != null) ?
new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
......@@ -103,7 +108,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
@Override
public Versioned<V> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
}
......@@ -198,7 +203,7 @@ public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
return new AbstractMap.SimpleEntry<>(
return Pair.of(
dK(e.getKey()),
new Versioned<>(
serializer.decode(e.getValue().value()),
......
......@@ -22,7 +22,9 @@ import org.apache.felix.scr.annotations.Service;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.store.serializers.StoreSerializer;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
......@@ -32,7 +34,7 @@ import com.google.common.collect.Sets;
*/
@Component(immediate = true, enabled = true)
@Service
public class DatabaseManager implements DatabaseService {
public class DatabaseManager implements StorageService {
private final Logger log = getLogger(getClass());
private PartitionedDatabase partitionedDatabase;
......@@ -44,7 +46,7 @@ public class DatabaseManager implements DatabaseService {
protected ClusterService clusterService;
protected String nodeToUri(ControllerNode node) {
return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
return String.format("tcp://%s:%d", node.ip(), COPYCAT_TCP_PORT);
}
@Activate
......@@ -76,7 +78,17 @@ public class DatabaseManager implements DatabaseService {
String localNodeUri = nodeToUri(clusterService.getLocalNode());
ClusterConfig clusterConfig = new ClusterConfig()
.withProtocol(new NettyTcpProtocol())
.withProtocol(new NettyTcpProtocol()
.withSsl(false)
.withConnectTimeout(60000)
.withAcceptBacklog(1024)
.withTrafficClass(-1)
.withSoLinger(-1)
.withReceiveBufferSize(32768)
.withSendBufferSize(8192)
.withThreads(1))
.withElectionTimeout(300)
.withHeartbeatInterval(150)
.withMembers(activeNodeUris)
.withLocalMember(localNodeUri);
......@@ -85,8 +97,15 @@ public class DatabaseManager implements DatabaseService {
partitionMap.forEach((name, nodes) -> {
Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
DatabaseConfig partitionConfig = new DatabaseConfig()
.withElectionTimeout(300)
.withHeartbeatInterval(150)
.withConsistency(Consistency.STRONG)
.withLog(new FileLog(logDir))
.withLog(new FileLog()
.withDirectory(logDir)
.withSegmentSize(1073741824) // 1GB
.withFlushOnWrite(true)
.withSegmentInterval(Long.MAX_VALUE))
.withDefaultSerializer(new DatabaseSerializer())
.withReplicas(replicas);
databaseConfig.addPartition(name, partitionConfig);
});
......@@ -116,7 +135,7 @@ public class DatabaseManager implements DatabaseService {
}
@Override
public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
}
}
\ No newline at end of file
......
......@@ -6,6 +6,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
/**
* Database proxy.
*/
......
package org.onosproject.store.consistent.impl;
import java.nio.ByteBuffer;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.cluster.internal.MemberInfo;
import net.kuujo.copycat.protocol.rpc.AppendRequest;
import net.kuujo.copycat.protocol.rpc.AppendResponse;
import net.kuujo.copycat.protocol.rpc.CommitRequest;
import net.kuujo.copycat.protocol.rpc.CommitResponse;
import net.kuujo.copycat.protocol.rpc.PollRequest;
import net.kuujo.copycat.protocol.rpc.PollResponse;
import net.kuujo.copycat.protocol.rpc.QueryRequest;
import net.kuujo.copycat.protocol.rpc.QueryResponse;
import net.kuujo.copycat.protocol.rpc.ReplicaInfo;
import net.kuujo.copycat.protocol.rpc.SyncRequest;
import net.kuujo.copycat.protocol.rpc.SyncResponse;
import net.kuujo.copycat.util.serializer.SerializerConfig;
/**
* Serializer for DatabaseManager's interaction with Copycat.
*/
public class DatabaseSerializer extends SerializerConfig {
private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(AppendRequest.class)
.register(AppendResponse.class)
.register(SyncRequest.class)
.register(SyncResponse.class)
.register(PollRequest.class)
.register(PollResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommitRequest.class)
.register(CommitResponse.class)
.register(ReplicaInfo.class)
.register(MemberInfo.class)
.build();
private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
.nextId(KryoNamespace.FLOATING_ID)
.register(Versioned.class)
.register(Pair.class)
.register(ImmutablePair.class)
.build();
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
serializerPool = KryoNamespace.newBuilder()
.register(KryoNamespaces.BASIC)
.register(COPYCAT)
.register(ONOS_STORE)
.build();
}
};
@Override
public ByteBuffer writeObject(Object object) {
return ByteBuffer.wrap(SERIALIZER.encode(object));
}
@Override
public <T> T readObject(ByteBuffer buffer) {
return SERIALIZER.decode(buffer);
}
}
\ No newline at end of file
......@@ -5,6 +5,9 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
......
......@@ -13,6 +13,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
/**
* Default database.
*/
......@@ -132,7 +135,7 @@ public class DefaultDatabase extends AbstractResource<Database> implements Datab
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
this.proxy = stateMachine.createProxy(DatabaseProxy.class);
this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
})
.thenApply(v -> null);
}
......
......@@ -6,8 +6,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
......@@ -88,17 +96,21 @@ public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
@Override
public Set<K> keySet(String tableName) {
return getTableMap(tableName).keySet();
return ImmutableSet.copyOf(getTableMap(tableName).keySet());
}
@Override
public Collection<Versioned<V>> values(String tableName) {
return getTableMap(tableName).values();
return ImmutableList.copyOf(getTableMap(tableName).values());
}
@Override
public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
return getTableMap(tableName).entrySet();
return ImmutableSet.copyOf(getTableMap(tableName)
.entrySet()
.stream()
.map(entry -> Pair.of(entry.getKey(), entry.getValue()))
.collect(Collectors.toSet()));
}
@Override
......@@ -110,7 +122,7 @@ public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
@Override
public boolean remove(String tableName, K key, V value) {
Versioned<V> existing = getTableMap(tableName).get(key);
if (existing != null && existing.value().equals(value)) {
if (existing != null && checkEquality(existing.value(), value)) {
getTableMap(tableName).remove(key);
return true;
}
......@@ -130,7 +142,7 @@ public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
@Override
public boolean replace(String tableName, K key, V oldValue, V newValue) {
Versioned<V> existing = getTableMap(tableName).get(key);
if (existing != null && existing.value().equals(oldValue)) {
if (existing != null && checkEquality(existing.value(), oldValue)) {
put(tableName, key, newValue);
return true;
}
......@@ -198,11 +210,11 @@ public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
return existingEntry != null && existingEntry.value().equals(update.currentValue());
return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
return existingEntry == null || existingEntry.value().equals(update.currentValue());
return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
......
......@@ -9,6 +9,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
......@@ -112,7 +116,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
return CompletableFuture.allOf(partitions
.values()
.stream()
.map(p -> p.values(tableName))
.map(p -> p.values(tableName).thenApply(values::addAll))
.toArray(CompletableFuture[]::new))
.thenApply(v -> values);
}
......
......@@ -67,13 +67,14 @@ public interface PartitionedDatabaseManager {
CopycatConfig copycatConfig = new CopycatConfig()
.withName(name)
.withClusterConfig(clusterConfig)
.withDefaultSerializer(new DatabaseSerializer())
.withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
partitionedDatabase.registerPartition(partitionName ,
coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
.withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
.withSerializer(copycatConfig.getDefaultSerializer())
.withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
partitionedDatabase.setPartitioner(
new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
......