Madan Jampani
Committed by Jonathan Hart

WIP: Consistent map implementation.

Change-Id: I51b2d954b7a8ff2c51c425d9a8125937d4eaa6b0

Change-Id: Ib27799d4eb60fc4bfaa8d2f21a904365ff5437eb

Change-Id: I95c937600ceb8f282a482280217671c471f40b9c
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import java.util.Collection;
4 +import java.util.List;
5 +import java.util.Set;
6 +import java.util.Map.Entry;
7 +
8 +/**
9 + * A distributed, strongly consistent map.
10 + * <p>
11 + * This map offers strong read-after-update (where update == create/update/delete)
12 + * consistency. All operations to the map are serialized and applied in a consistent
13 + * manner.
14 + * <p>
15 + * The stronger consistency comes at the expense of availability in
16 + * the event of a network partition. A network partition can be either due to
17 + * a temporary disruption in network connectivity between participating nodes
18 + * or due to a node being temporarily down.
19 + * </p><p>
20 + * All values stored in this map are versioned and the API supports optimistic
21 + * concurrency by allowing conditional updates that take into consideration
22 + * the version or value that was previously read.
23 + * </p><p>
24 + * The map also supports atomic batch updates (transactions). One can provide a list
25 + * of updates to be applied atomically if and only if all the operations are guaranteed
26 + * to succeed i.e. all their preconditions are met. For example, the precondition
27 + * for a putIfAbsent API call is absence of a mapping for the key. Similarly, the
28 + * precondition for a conditional replace operation is the presence of an expected
29 + * version or value
30 + * </p><p>
31 + * This map does not allow null values. All methods can throw a ConsistentMapException
32 + * (which extends RuntimeException) to indicate failures.
33 + *
34 + */
35 +public interface ConsistentMap<K, V> {
36 +
37 + /**
38 + * Returns the number of entries in the map.
39 + *
40 + * @return map size.
41 + */
42 + int size();
43 +
44 + /**
45 + * Returns true if the map is empty.
46 + *
47 + * @return true if map has no entries, false otherwise.
48 + */
49 + boolean isEmpty();
50 +
51 + /**
52 + * Returns true if this map contains a mapping for the specified key.
53 + *
54 + * @param key key
55 + * @return true if map contains key, false otherwise.
56 + */
57 + boolean containsKey(K key);
58 +
59 + /**
60 + * Returns true if this map contains the specified value.
61 + *
62 + * @param value value
63 + * @return true if map contains value, false otherwise.
64 + */
65 + boolean containsValue(V value);
66 +
67 + /**
68 + * Returns the value (and version) to which the specified key is mapped, or null if this
69 + * map contains no mapping for the key.
70 + *
71 + * @param key the key whose associated value (and version) is to be returned
72 + * @return the value (and version) to which the specified key is mapped, or null if
73 + * this map contains no mapping for the key
74 + */
75 + Versioned<V> get(K key);
76 +
77 + /**
78 + * Associates the specified value with the specified key in this map (optional operation).
79 + * If the map previously contained a mapping for the key, the old value is replaced by the
80 + * specified value.
81 + *
82 + * @param key key with which the specified value is to be associated
83 + * @param value value to be associated with the specified key
84 + * @return the previous value (and version) associated with key, or null if there was
85 + * no mapping for key.
86 + */
87 + Versioned<V> put(K key, V value);
88 +
89 + /**
90 + * Removes the mapping for a key from this map if it is present (optional operation).
91 + *
92 + * @param key key whose value is to be removed from the map
93 + * @return the value (and version) to which this map previously associated the key,
94 + * or null if the map contained no mapping for the key.
95 + */
96 + Versioned<V> remove(K key);
97 +
98 + /**
99 + * Removes all of the mappings from this map (optional operation).
100 + * The map will be empty after this call returns.
101 + */
102 + void clear();
103 +
104 + /**
105 + * Returns a Set view of the keys contained in this map.
106 + * This method differs from the behavior of java.util.Map.keySet() in that
107 + * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
108 + * Attempts to modify the returned set, whether direct or via its iterator,
109 + * result in an UnsupportedOperationException.
110 + *
111 + * @return a set of the keys contained in this map
112 + */
113 + Set<K> keySet();
114 +
115 + /**
116 + * Returns the collection of values (and associated versions) contained in this map.
117 + * This method differs from the behavior of java.util.Map.values() in that
118 + * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
119 + * Attempts to modify the returned collection, whether direct or via its iterator,
120 + * result in an UnsupportedOperationException.
121 + *
122 + * @return a collection of the values (and associated versions) contained in this map
123 + */
124 + Collection<Versioned<V>> values();
125 +
126 + /**
127 + * Returns the set of entries contained in this map.
128 + * This method differs from the behavior of java.util.Map.entrySet() in that
129 + * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
130 + * Attempts to modify the returned set, whether direct or via its iterator,
131 + * result in an UnsupportedOperationException.
132 + *
133 + * @return set of entries contained in this map.
134 + */
135 + Set<Entry<K, Versioned<V>>> entrySet();
136 +
137 + /**
138 + * If the specified key is not already associated with a value
139 + * associates it with the given value and returns null, else returns the current value.
140 + *
141 + * @param key key with which the specified value is to be associated
142 + * @param value value to be associated with the specified key
143 + * @return the previous value associated with the specified key or null
144 + * if key does not already mapped to a value.
145 + */
146 + Versioned<V> putIfAbsent(K key, V value);
147 +
148 + /**
149 + * Removes the entry for the specified key only if it is currently
150 + * mapped to the specified value.
151 + *
152 + * @param key key with which the specified value is associated
153 + * @param value value expected to be associated with the specified key
154 + * @return true if the value was removed
155 + */
156 + boolean remove(K key, V value);
157 +
158 + /**
159 + * Removes the entry for the specified key only if its current
160 + * version in the map is equal to the specified version.
161 + *
162 + * @param key key with which the specified version is associated
163 + * @param version version expected to be associated with the specified key
164 + * @return true if the value was removed
165 + */
166 + boolean remove(K key, long version);
167 +
168 + /**
169 + * Replaces the entry for the specified key only if currently mapped
170 + * to the specified value.
171 + *
172 + * @param key key with which the specified value is associated
173 + * @param oldValue value expected to be associated with the specified key
174 + * @param newValue value to be associated with the specified key
175 + * @return true if the value was replaced
176 + */
177 + boolean replace(K key, V oldValue, V newValue);
178 +
179 + /**
180 + * Replaces the entry for the specified key only if it is currently mapped to the
181 + * specified version.
182 + *
183 + * @param key key key with which the specified value is associated
184 + * @param oldVersion version expected to be associated with the specified key
185 + * @param newValue value to be associated with the specified key
186 + * @return true if the value was replaced
187 + */
188 + boolean replace(K key, long oldVersion, V newValue);
189 +
190 + /**
191 + * Atomically apply the specified list of updates to the map.
192 + * If any of the updates cannot be applied due to a precondition
193 + * violation, none of the updates will be applied and the state of
194 + * the map remains unaltered.
195 + *
196 + * @param updates list of updates to apply atomically.
197 + * @return true if the map was updated.
198 + */
199 + boolean batchUpdate(List<UpdateOperation<K, V>> updates);
200 +}
1 +package org.onosproject.store.consistent.impl;
2 +
3 +/**
4 + * Top level exception for ConsistentMap failures.
5 + */
6 +@SuppressWarnings("serial")
7 +public class ConsistentMapException extends RuntimeException {
8 + public ConsistentMapException() {
9 + }
10 +
11 + public ConsistentMapException(Throwable t) {
12 + super(t);
13 + }
14 +
15 + /**
16 + * ConsistentMap operation timeout.
17 + */
18 + public static class Timeout extends ConsistentMapException {
19 + }
20 +
21 + /**
22 + * ConsistentMap operation interrupted.
23 + */
24 + public static class Interrupted extends ConsistentMapException {
25 + }
26 +}
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import static com.google.common.base.Preconditions.*;
4 +
5 +import java.util.AbstractMap;
6 +import java.util.Collection;
7 +import java.util.Collections;
8 +import java.util.List;
9 +import java.util.Map;
10 +import java.util.Map.Entry;
11 +import java.util.concurrent.CompletableFuture;
12 +import java.util.concurrent.ExecutionException;
13 +import java.util.concurrent.TimeUnit;
14 +import java.util.concurrent.TimeoutException;
15 +import java.util.stream.Collectors;
16 +import java.util.Set;
17 +
18 +import org.onlab.util.HexString;
19 +import org.onosproject.store.serializers.StoreSerializer;
20 +
21 +import com.google.common.cache.CacheBuilder;
22 +import com.google.common.cache.CacheLoader;
23 +import com.google.common.cache.LoadingCache;
24 +
25 +/**
26 + * ConsistentMap implementation that is backed by a Raft consensus
27 + * based database.
28 + *
29 + * @param <K> type of key.
30 + * @param <V> type of value.
31 + */
32 +public class ConsistentMapImpl<K, V> implements ConsistentMap<K, V> {
33 +
34 + private final String name;
35 + private final DatabaseProxy<String, byte[]> proxy;
36 + private final StoreSerializer serializer;
37 +
38 + private static final int OPERATION_TIMEOUT_MILLIS = 1000;
39 + private static final String ERROR_NULL_KEY = "Key cannot be null";
40 + private static final String ERROR_NULL_VALUE = "Null values are not allowed";
41 +
42 + private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
43 + .softValues()
44 + .build(new CacheLoader<K, String>() {
45 +
46 + @Override
47 + public String load(K key) {
48 + return HexString.toHexString(serializer.encode(key));
49 + }
50 + });
51 +
52 + protected K dK(String key) {
53 + return serializer.decode(HexString.fromHexString(key));
54 + }
55 +
56 + ConsistentMapImpl(String name,
57 + DatabaseProxy<String, byte[]> proxy,
58 + StoreSerializer serializer) {
59 + this.name = checkNotNull(name, "map name cannot be null");
60 + this.proxy = checkNotNull(proxy, "database proxy cannot be null");
61 + this.serializer = checkNotNull(serializer, "serializer cannot be null");
62 + }
63 +
64 + @Override
65 + public int size() {
66 + return complete(proxy.size(name));
67 + }
68 +
69 + @Override
70 + public boolean isEmpty() {
71 + return complete(proxy.isEmpty(name));
72 + }
73 +
74 + @Override
75 + public boolean containsKey(K key) {
76 + checkNotNull(key, ERROR_NULL_KEY);
77 + return complete(proxy.containsKey(name, keyCache.getUnchecked(key)));
78 + }
79 +
80 + @Override
81 + public boolean containsValue(V value) {
82 + checkNotNull(value, ERROR_NULL_VALUE);
83 + return complete(proxy.containsValue(name, serializer.encode(value)));
84 + }
85 +
86 + @Override
87 + public Versioned<V> get(K key) {
88 + checkNotNull(key, ERROR_NULL_KEY);
89 + Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
90 + return new Versioned<>(serializer.decode(value.value()), value.version());
91 + }
92 +
93 + @Override
94 + public Versioned<V> put(K key, V value) {
95 + checkNotNull(key, ERROR_NULL_KEY);
96 + checkNotNull(value, ERROR_NULL_VALUE);
97 + Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
98 + return (previousValue != null) ?
99 + new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
100 +
101 + }
102 +
103 + @Override
104 + public Versioned<V> remove(K key) {
105 + checkNotNull(key, ERROR_NULL_KEY);
106 + Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
107 + return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
108 + }
109 +
110 + @Override
111 + public void clear() {
112 + complete(proxy.clear(name));
113 + }
114 +
115 + @Override
116 + public Set<K> keySet() {
117 + return Collections.unmodifiableSet(complete(proxy.keySet(name))
118 + .stream()
119 + .map(this::dK)
120 + .collect(Collectors.toSet()));
121 + }
122 +
123 + @Override
124 + public Collection<Versioned<V>> values() {
125 + return Collections.unmodifiableList(complete(proxy.values(name))
126 + .stream()
127 + .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version()))
128 + .collect(Collectors.toList()));
129 + }
130 +
131 + @Override
132 + public Set<Entry<K, Versioned<V>>> entrySet() {
133 + return Collections.unmodifiableSet(complete(proxy.entrySet(name))
134 + .stream()
135 + .map(this::fromRawEntry)
136 + .collect(Collectors.toSet()));
137 + }
138 +
139 + @Override
140 + public Versioned<V> putIfAbsent(K key, V value) {
141 + checkNotNull(key, ERROR_NULL_KEY);
142 + checkNotNull(value, ERROR_NULL_VALUE);
143 + Versioned<byte[]> existingValue = complete(proxy.putIfAbsent(
144 + name, keyCache.getUnchecked(key), serializer.encode(value)));
145 + return (existingValue != null) ?
146 + new Versioned<>(serializer.decode(existingValue.value()), existingValue.version()) : null;
147 + }
148 +
149 + @Override
150 + public boolean remove(K key, V value) {
151 + checkNotNull(key, ERROR_NULL_KEY);
152 + checkNotNull(value, ERROR_NULL_VALUE);
153 + return complete(proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value)));
154 + }
155 +
156 + @Override
157 + public boolean remove(K key, long version) {
158 + checkNotNull(key, ERROR_NULL_KEY);
159 + return complete(proxy.remove(name, keyCache.getUnchecked(key), version));
160 +
161 + }
162 +
163 + @Override
164 + public boolean replace(K key, V oldValue, V newValue) {
165 + checkNotNull(key, ERROR_NULL_KEY);
166 + checkNotNull(newValue, ERROR_NULL_VALUE);
167 + byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
168 + return complete(proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue)));
169 + }
170 +
171 + @Override
172 + public boolean replace(K key, long oldVersion, V newValue) {
173 + checkNotNull(key, ERROR_NULL_KEY);
174 + checkNotNull(newValue, ERROR_NULL_VALUE);
175 + return complete(proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue)));
176 + }
177 +
178 + @Override
179 + public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
180 + checkNotNull(updates, "updates cannot be null");
181 + return complete(proxy.atomicBatchUpdate(updates
182 + .stream()
183 + .map(this::toRawUpdateOperation)
184 + .collect(Collectors.toList())));
185 + }
186 +
187 + private static <T> T complete(CompletableFuture<T> future) {
188 + try {
189 + return future.get(OPERATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
190 + } catch (InterruptedException e) {
191 + Thread.currentThread().interrupt();
192 + throw new ConsistentMapException.Interrupted();
193 + } catch (TimeoutException e) {
194 + throw new ConsistentMapException.Timeout();
195 + } catch (ExecutionException e) {
196 + throw new ConsistentMapException(e.getCause());
197 + }
198 + }
199 +
200 + private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
201 + return new AbstractMap.SimpleEntry<>(
202 + dK(e.getKey()),
203 + new Versioned<>(
204 + serializer.decode(e.getValue().value()),
205 + e.getValue().version()));
206 + }
207 +
208 + private UpdateOperation<String, byte[]> toRawUpdateOperation(UpdateOperation<K, V> update) {
209 +
210 + checkArgument(name.equals(update.tableName()), "Unexpected table name");
211 +
212 + UpdateOperation.Builder<String, byte[]> rawUpdate = UpdateOperation.<String, byte[]>newBuilder();
213 +
214 + rawUpdate = rawUpdate.withKey(keyCache.getUnchecked(update.key()))
215 + .withCurrentVersion(update.currentVersion())
216 + .withType(update.type());
217 +
218 + rawUpdate = rawUpdate.withTableName(update.tableName());
219 +
220 + if (update.value() != null) {
221 + rawUpdate = rawUpdate.withValue(serializer.encode(update.value()));
222 + } else {
223 + checkState(update.type() == UpdateOperation.Type.REMOVE
224 + || update.type() == UpdateOperation.Type.REMOVE_IF_VERSION_MATCH,
225 + ERROR_NULL_VALUE);
226 + }
227 +
228 + if (update.currentValue() != null) {
229 + rawUpdate = rawUpdate.withCurrentValue(serializer.encode(update.currentValue()));
230 + }
231 +
232 + return rawUpdate.build();
233 + }
234 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkArgument;
4 +import static com.google.common.base.Preconditions.checkNotNull;
5 +import static org.slf4j.LoggerFactory.getLogger;
6 +
7 +import java.io.File;
8 +import java.io.IOException;
9 +import java.util.HashMap;
10 +import java.util.HashSet;
11 +import java.util.Iterator;
12 +import java.util.Map;
13 +import java.util.Map.Entry;
14 +import java.util.Set;
15 +
16 +import org.onosproject.cluster.DefaultControllerNode;
17 +import org.onosproject.cluster.NodeId;
18 +import org.onlab.packet.IpAddress;
19 +import org.slf4j.Logger;
20 +
21 +import com.fasterxml.jackson.core.JsonEncoding;
22 +import com.fasterxml.jackson.core.JsonFactory;
23 +import com.fasterxml.jackson.databind.JsonNode;
24 +import com.fasterxml.jackson.databind.ObjectMapper;
25 +import com.fasterxml.jackson.databind.node.ArrayNode;
26 +import com.fasterxml.jackson.databind.node.ObjectNode;
27 +import com.google.common.collect.Maps;
28 +
29 +/**
30 + * Allows for reading and writing partitioned database definition as a JSON file.
31 + */
32 +public class DatabaseDefinitionStore {
33 +
34 + private final Logger log = getLogger(getClass());
35 +
36 + private final File definitionfile;
37 +
38 + /**
39 + * Creates a reader/writer of the database definition file.
40 + *
41 + * @param filePath location of the definition file
42 + */
43 + public DatabaseDefinitionStore(String filePath) {
44 + definitionfile = new File(filePath);
45 + }
46 +
47 + /**
48 + * Creates a reader/writer of the database definition file.
49 + *
50 + * @param filePath location of the definition file
51 + */
52 + public DatabaseDefinitionStore(File filePath) {
53 + definitionfile = checkNotNull(filePath);
54 + }
55 +
56 + /**
57 + * Returns the Map from database partition name to set of initial active member nodes.
58 + *
59 + * @return Map from partition name to set of active member nodes
60 + * @throws IOException when I/O exception of some sort has occurred.
61 + */
62 + public Map<String, Set<DefaultControllerNode>> read() throws IOException {
63 +
64 + final Map<String, Set<DefaultControllerNode>> partitions = Maps.newHashMap();
65 +
66 + final ObjectMapper mapper = new ObjectMapper();
67 + final ObjectNode tabletNodes = (ObjectNode) mapper.readTree(definitionfile);
68 + final Iterator<Entry<String, JsonNode>> fields = tabletNodes.fields();
69 + while (fields.hasNext()) {
70 + final Entry<String, JsonNode> next = fields.next();
71 + final Set<DefaultControllerNode> nodes = new HashSet<>();
72 + final Iterator<JsonNode> elements = next.getValue().elements();
73 + while (elements.hasNext()) {
74 + ObjectNode nodeDef = (ObjectNode) elements.next();
75 + nodes.add(new DefaultControllerNode(new NodeId(nodeDef.get("id").asText()),
76 + IpAddress.valueOf(nodeDef.get("ip").asText()),
77 + nodeDef.get("tcpPort").asInt(DatabaseManager.COPYCAT_TCP_PORT)));
78 + }
79 +
80 + partitions.put(next.getKey(), nodes);
81 + }
82 + return partitions;
83 + }
84 +
85 + /**
86 + * Updates the Map from database partition name to set of member nodes.
87 + *
88 + * @param partitionName name of the database partition to update
89 + * @param nodes set of initial member nodes
90 + * @throws IOException when I/O exception of some sort has occurred.
91 + */
92 + public void write(String partitionName, Set<DefaultControllerNode> nodes) throws IOException {
93 + checkNotNull(partitionName);
94 + checkArgument(partitionName.isEmpty(), "Partition name cannot be empty");
95 +
96 + // load current
97 + Map<String, Set<DefaultControllerNode>> config;
98 + try {
99 + config = read();
100 + } catch (IOException e) {
101 + log.info("Reading partition config failed, assuming empty definition.");
102 + config = new HashMap<>();
103 + }
104 + // update with specified
105 + config.put(partitionName, nodes);
106 +
107 + // write back to file
108 + final ObjectMapper mapper = new ObjectMapper();
109 + final ObjectNode partitionNodes = mapper.createObjectNode();
110 + for (Entry<String, Set<DefaultControllerNode>> tablet : config.entrySet()) {
111 + ArrayNode nodeDefs = mapper.createArrayNode();
112 + partitionNodes.set(tablet.getKey(), nodeDefs);
113 +
114 + for (DefaultControllerNode node : tablet.getValue()) {
115 + ObjectNode nodeDef = mapper.createObjectNode();
116 + nodeDef.put("id", node.id().toString())
117 + .put("ip", node.ip().toString())
118 + .put("tcpPort", node.tcpPort());
119 + nodeDefs.add(nodeDef);
120 + }
121 + }
122 + mapper.writeTree(new JsonFactory().createGenerator(definitionfile, JsonEncoding.UTF8),
123 + partitionNodes);
124 + }
125 +}
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import static org.slf4j.LoggerFactory.getLogger;
4 +
5 +import java.io.File;
6 +import java.io.IOException;
7 +import java.util.Map;
8 +import java.util.Set;
9 +import java.util.stream.Collectors;
10 +
11 +import net.kuujo.copycat.cluster.ClusterConfig;
12 +import net.kuujo.copycat.log.FileLog;
13 +import net.kuujo.copycat.netty.NettyTcpProtocol;
14 +import net.kuujo.copycat.protocol.Consistency;
15 +
16 +import org.apache.felix.scr.annotations.Activate;
17 +import org.apache.felix.scr.annotations.Component;
18 +import org.apache.felix.scr.annotations.Deactivate;
19 +import org.apache.felix.scr.annotations.Reference;
20 +import org.apache.felix.scr.annotations.ReferenceCardinality;
21 +import org.apache.felix.scr.annotations.Service;
22 +import org.onosproject.cluster.ClusterService;
23 +import org.onosproject.cluster.ControllerNode;
24 +import org.onosproject.cluster.DefaultControllerNode;
25 +import org.onosproject.store.serializers.StoreSerializer;
26 +import org.slf4j.Logger;
27 +
28 +import com.google.common.collect.Sets;
29 +
30 +/**
31 + * Database manager.
32 + */
33 +@Component(immediate = true, enabled = true)
34 +@Service
35 +public class DatabaseManager implements DatabaseService {
36 +
37 + private final Logger log = getLogger(getClass());
38 + private PartitionedDatabase partitionedDatabase;
39 + public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
40 + private static final String CONFIG_DIR = "../config";
41 + private static final String PARTITION_DEFINITION_FILE = "tablets.json";
42 +
43 + @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
44 + protected ClusterService clusterService;
45 +
46 + protected String nodeToUri(ControllerNode node) {
47 + return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
48 + }
49 +
50 + @Activate
51 + public void activate() {
52 +
53 + final String logDir = System.getProperty("karaf.data", "./data");
54 +
55 + // load database configuration
56 + File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
57 + log.info("Loading database definition: {}", file.getAbsolutePath());
58 +
59 + DatabaseDefinitionStore databaseDef = new DatabaseDefinitionStore(file);
60 + Map<String, Set<DefaultControllerNode>> partitionMap;
61 + try {
62 + partitionMap = databaseDef.read();
63 + } catch (IOException e) {
64 + log.error("Failed to load database config {}", file);
65 + throw new IllegalStateException("Failed to load database config", e);
66 + }
67 +
68 + String[] activeNodeUris = partitionMap.values()
69 + .stream()
70 + .reduce((s1, s2) -> Sets.union(s1, s2))
71 + .get()
72 + .stream()
73 + .map(this::nodeToUri)
74 + .toArray(String[]::new);
75 +
76 + String localNodeUri = nodeToUri(clusterService.getLocalNode());
77 +
78 + ClusterConfig clusterConfig = new ClusterConfig()
79 + .withProtocol(new NettyTcpProtocol())
80 + .withMembers(activeNodeUris)
81 + .withLocalMember(localNodeUri);
82 +
83 + PartitionedDatabaseConfig databaseConfig = new PartitionedDatabaseConfig();
84 +
85 + partitionMap.forEach((name, nodes) -> {
86 + Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
87 + DatabaseConfig partitionConfig = new DatabaseConfig()
88 + .withConsistency(Consistency.STRONG)
89 + .withLog(new FileLog(logDir))
90 + .withReplicas(replicas);
91 + databaseConfig.addPartition(name, partitionConfig);
92 + });
93 +
94 + partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
95 +
96 + partitionedDatabase.open().whenComplete((db, error) -> {
97 + if (error != null) {
98 + log.warn("Failed to open database.", error);
99 + } else {
100 + log.info("Successfully opened database.");
101 + }
102 + });
103 + log.info("Started");
104 + }
105 +
106 + @Deactivate
107 + public void deactivate() {
108 + partitionedDatabase.close().whenComplete((result, error) -> {
109 + if (error != null) {
110 + log.warn("Failed to cleanly close database.", error);
111 + } else {
112 + log.info("Successfully closed database.");
113 + }
114 + });
115 + log.info("Stopped");
116 + }
117 +
118 + @Override
119 + public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
120 + return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
121 + }
122 +}
...\ No newline at end of file ...\ No newline at end of file
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import static com.google.common.base.Preconditions.checkState;
4 +
5 +import java.util.Map;
6 +
7 +import com.google.common.base.Charsets;
8 +import com.google.common.collect.ImmutableSortedMap;
9 +import com.google.common.hash.Hashing;
10 +
11 +/**
12 + * Partitioner for mapping table entries to individual database partitions.
13 + * <p>
14 + * By default a md5 hash of the hash key (key or table name) is used to pick a
15 + * partition.
16 + */
17 +public abstract class DatabasePartitioner implements Partitioner<String> {
18 + // Database partitions sorted by their partition name.
19 + protected final Database[] sortedPartitions;
20 +
21 + public DatabasePartitioner(Map<String, Database> partitionMap) {
22 + checkState(partitionMap != null && !partitionMap.isEmpty(), "Partition map cannot be null or empty");
23 + sortedPartitions = ImmutableSortedMap.<String, Database>copyOf(partitionMap).values().toArray(new Database[]{});
24 + }
25 +
26 + protected int hash(String key) {
27 + return Math.abs(Hashing.md5().newHasher().putBytes(key.getBytes(Charsets.UTF_8)).hash().asInt());
28 + }
29 +
30 +}
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import org.onosproject.store.serializers.StoreSerializer;
4 +
5 +/**
6 + * Database service.
7 + */
8 +public interface DatabaseService {
9 +
10 + /**
11 + * Creates a ConsistentMap.
12 + *
13 + * @param name map name
14 + * @param serializer serializer to use for serializing keys and values.
15 + * @return consistent map.
16 + */
17 + <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
18 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -9,8 +9,6 @@ import java.util.concurrent.CompletableFuture; ...@@ -9,8 +9,6 @@ import java.util.concurrent.CompletableFuture;
9 import java.util.concurrent.CopyOnWriteArrayList; 9 import java.util.concurrent.CopyOnWriteArrayList;
10 import java.util.concurrent.atomic.AtomicBoolean; 10 import java.util.concurrent.atomic.AtomicBoolean;
11 import java.util.concurrent.atomic.AtomicInteger; 11 import java.util.concurrent.atomic.AtomicInteger;
12 -import java.util.stream.Collectors;
13 -
14 import com.google.common.collect.ImmutableMap; 12 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.Lists; 13 import com.google.common.collect.Lists;
16 import com.google.common.collect.Maps; 14 import com.google.common.collect.Maps;
...@@ -188,8 +186,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -188,8 +186,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
188 .values() 186 .values()
189 .stream() 187 .stream()
190 .map(Database::open) 188 .map(Database::open)
191 - .collect(Collectors.toList()) 189 + .toArray(CompletableFuture[]::new))
192 - .toArray(new CompletableFuture[partitions.size()]))
193 .thenApply(v -> this)); 190 .thenApply(v -> this));
194 191
195 } 192 }
...@@ -200,8 +197,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti ...@@ -200,8 +197,7 @@ public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, Parti
200 .values() 197 .values()
201 .stream() 198 .stream()
202 .map(database -> database.close()) 199 .map(database -> database.close())
203 - .collect(Collectors.toList()) 200 + .toArray(CompletableFuture[]::new));
204 - .toArray(new CompletableFuture[partitions.size()]));
205 CompletableFuture<Void> closeCoordinator = coordinator.close(); 201 CompletableFuture<Void> closeCoordinator = coordinator.close();
206 return closePartitions.thenCompose(v -> closeCoordinator); 202 return closePartitions.thenCompose(v -> closeCoordinator);
207 } 203 }
......
...@@ -24,7 +24,7 @@ public class PartitionedDatabaseConfig { ...@@ -24,7 +24,7 @@ public class PartitionedDatabaseConfig {
24 * @param config partition config 24 * @param config partition config
25 * @return this instance 25 * @return this instance
26 */ 26 */
27 - public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) { 27 + public PartitionedDatabaseConfig addPartition(String name, DatabaseConfig config) {
28 partitions.put(name, config); 28 partitions.put(name, config);
29 return this; 29 return this;
30 } 30 }
......
...@@ -10,6 +10,9 @@ import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator; ...@@ -10,6 +10,9 @@ import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
10 import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator; 10 import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
11 import net.kuujo.copycat.util.concurrent.NamedThreadFactory; 11 import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
12 12
13 +/**
14 + * Manages a PartitionedDatabase.
15 + */
13 public interface PartitionedDatabaseManager { 16 public interface PartitionedDatabaseManager {
14 /** 17 /**
15 * Opens the database. 18 * Opens the database.
...@@ -73,7 +76,7 @@ public interface PartitionedDatabaseManager { ...@@ -73,7 +76,7 @@ public interface PartitionedDatabaseManager {
73 .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy()) 76 .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
74 .withDefaultExecutor(copycatConfig.getDefaultExecutor())))); 77 .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
75 partitionedDatabase.setPartitioner( 78 partitionedDatabase.setPartitioner(
76 - new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions())); 79 + new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
77 return partitionedDatabase; 80 return partitionedDatabase;
78 } 81 }
79 } 82 }
......
1 package org.onosproject.store.consistent.impl; 1 package org.onosproject.store.consistent.impl;
2 2
3 -import java.util.Collections;
4 -import java.util.List;
5 import java.util.Map; 3 import java.util.Map;
6 4
7 -import com.google.common.collect.ImmutableMap;
8 -import com.google.common.collect.Lists;
9 -
10 /** 5 /**
11 - * A simple Partitioner that uses the key hashCode to map 6 + * A simple Partitioner for mapping keys to database partitions.
12 - * key to a partition. 7 + * <p>
8 + * This class uses a md5 hash based hashing scheme for hashing the key to
9 + * a partition.
13 * 10 *
14 - * @param <K> key type.
15 */ 11 */
16 -public class SimpleKeyHashPartitioner<K> implements Partitioner<K> { 12 +public class SimpleKeyHashPartitioner extends DatabasePartitioner {
17 -
18 - private final Map<String, Database> partitionMap;
19 - private final List<String> sortedPartitionNames;
20 13
21 public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) { 14 public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
22 - this.partitionMap = ImmutableMap.copyOf(partitionMap); 15 + super(partitionMap);
23 - sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
24 - Collections.sort(sortedPartitionNames);
25 } 16 }
26 17
27 @Override 18 @Override
28 - public Database getPartition(String tableName, K key) { 19 + public Database getPartition(String tableName, String key) {
29 - return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size())); 20 + return sortedPartitions[hash(key) % sortedPartitions.length];
30 } 21 }
31 -} 22 +}
...\ No newline at end of file ...\ No newline at end of file
......
1 +package org.onosproject.store.consistent.impl;
2 +
3 +import java.util.Map;
4 +
5 +/**
6 + * A simple Partitioner that uses the table name hash to
7 + * pick a partition.
8 + * <p>
9 + * This class uses a md5 hash based hashing scheme for hashing the table name to
10 + * a partition. This partitioner maps all keys for a table to the same database
11 + * partition.
12 + */
13 +public class SimpleTableHashPartitioner extends DatabasePartitioner {
14 +
15 + public SimpleTableHashPartitioner(Map<String, Database> partitionMap) {
16 + super(partitionMap);
17 + }
18 +
19 + @Override
20 + public Database getPartition(String tableName, String key) {
21 + return sortedPartitions[hash(tableName) % sortedPartitions.length];
22 + }
23 +}
...\ No newline at end of file ...\ No newline at end of file
1 package org.onosproject.store.consistent.impl; 1 package org.onosproject.store.consistent.impl;
2 2
3 +import static com.google.common.base.Preconditions.*;
4 +
3 import com.google.common.base.MoreObjects; 5 import com.google.common.base.MoreObjects;
4 6
5 /** 7 /**
...@@ -28,7 +30,7 @@ public class UpdateOperation<K, V> { ...@@ -28,7 +30,7 @@ public class UpdateOperation<K, V> {
28 private K key; 30 private K key;
29 private V value; 31 private V value;
30 private V currentValue; 32 private V currentValue;
31 - private long currentVersion; 33 + private long currentVersion = -1;
32 34
33 /** 35 /**
34 * Returns the type of update operation. 36 * Returns the type of update operation.
...@@ -91,6 +93,17 @@ public class UpdateOperation<K, V> { ...@@ -91,6 +93,17 @@ public class UpdateOperation<K, V> {
91 } 93 }
92 94
93 /** 95 /**
96 + * Creates a new builder instance.
97 + * @param <K> key type.
98 + * @param <V> value type.
99 + *
100 + * @return builder.
101 + */
102 + public static <K, V> Builder<K, V> newBuilder() {
103 + return new Builder<>();
104 + }
105 +
106 + /**
94 * UpdatOperation builder. 107 * UpdatOperation builder.
95 * 108 *
96 * @param <K> key type. 109 * @param <K> key type.
...@@ -100,52 +113,70 @@ public class UpdateOperation<K, V> { ...@@ -100,52 +113,70 @@ public class UpdateOperation<K, V> {
100 113
101 private UpdateOperation<K, V> operation = new UpdateOperation<>(); 114 private UpdateOperation<K, V> operation = new UpdateOperation<>();
102 115
103 - /**
104 - * Creates a new builder instance.
105 - * @param <K> key type.
106 - * @param <V> value type.
107 - *
108 - * @return builder.
109 - */
110 - public static <K, V> Builder<K, V> builder() {
111 - return new Builder<>();
112 - }
113 -
114 - private Builder() {
115 - }
116 -
117 public UpdateOperation<K, V> build() { 116 public UpdateOperation<K, V> build() {
117 + validateInputs();
118 return operation; 118 return operation;
119 } 119 }
120 120
121 public Builder<K, V> withType(Type type) { 121 public Builder<K, V> withType(Type type) {
122 - operation.type = type; 122 + operation.type = checkNotNull(type, "type cannot be null");
123 return this; 123 return this;
124 } 124 }
125 125
126 public Builder<K, V> withTableName(String tableName) { 126 public Builder<K, V> withTableName(String tableName) {
127 - operation.tableName = tableName; 127 + operation.tableName = checkNotNull(tableName, "tableName cannot be null");
128 return this; 128 return this;
129 } 129 }
130 130
131 public Builder<K, V> withKey(K key) { 131 public Builder<K, V> withKey(K key) {
132 - operation.key = key; 132 + operation.key = checkNotNull(key, "key cannot be null");
133 return this; 133 return this;
134 } 134 }
135 135
136 public Builder<K, V> withCurrentValue(V value) { 136 public Builder<K, V> withCurrentValue(V value) {
137 - operation.currentValue = value; 137 + operation.currentValue = checkNotNull(value, "currentValue cannot be null");
138 return this; 138 return this;
139 } 139 }
140 140
141 public Builder<K, V> withValue(V value) { 141 public Builder<K, V> withValue(V value) {
142 - operation.value = value; 142 + operation.value = checkNotNull(value, "value cannot be null");
143 return this; 143 return this;
144 } 144 }
145 145
146 public Builder<K, V> withCurrentVersion(long version) { 146 public Builder<K, V> withCurrentVersion(long version) {
147 + checkArgument(version >= 0, "version cannot be negative");
147 operation.currentVersion = version; 148 operation.currentVersion = version;
148 return this; 149 return this;
149 } 150 }
151 +
152 + private void validateInputs() {
153 + checkNotNull(operation.type, "type must be specified");
154 + checkNotNull(operation.tableName, "table name must be specified");
155 + checkNotNull(operation.key, "key must be specified");
156 + switch (operation.type) {
157 + case PUT:
158 + case PUT_IF_ABSENT:
159 + checkNotNull(operation.value, "value must be specified.");
160 + break;
161 + case PUT_IF_VERSION_MATCH:
162 + checkNotNull(operation.value, "value must be specified.");
163 + checkState(operation.currentVersion >= 0, "current version must be specified");
164 + break;
165 + case PUT_IF_VALUE_MATCH:
166 + checkNotNull(operation.value, "value must be specified.");
167 + checkNotNull(operation.currentValue, "currentValue must be specified.");
168 + break;
169 + case REMOVE:
170 + break;
171 + case REMOVE_IF_VERSION_MATCH:
172 + checkState(operation.currentVersion >= 0, "current version must be specified");
173 + break;
174 + case REMOVE_IF_VALUE_MATCH:
175 + checkNotNull(operation.currentValue, "currentValue must be specified.");
176 + break;
177 + default:
178 + throw new IllegalStateException("Unknown operation type");
179 + }
180 + }
150 } 181 }
151 } 182 }
......
...@@ -100,7 +100,7 @@ ...@@ -100,7 +100,7 @@
100 <filter> 100 <filter>
101 <artifact>org.onosproject:copycat*</artifact> 101 <artifact>org.onosproject:copycat*</artifact>
102 <includes> 102 <includes>
103 - <include>net/kuujo/copycat/**</include> 103 + <include>**</include>
104 </includes> 104 </includes>
105 </filter> 105 </filter>
106 106
......